# HG changeset patch # User Radek Brich # Date 1398786615 -7200 # Node ID 0242997020874f64631b9df29827a3bfe40251f8 # Parent a4af93e72e2b66caebf8d68384de39f40088774f Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter). diff -r a4af93e72e2b -r 024299702087 batchcopy.py --- a/batchcopy.py Mon Apr 14 22:28:12 2014 +0200 +++ b/batchcopy.py Tue Apr 29 17:50:15 2014 +0200 @@ -1,6 +1,7 @@ #!/usr/bin/env python3 from pgtoolkit import toolbase +from pgtoolkit.pgmanager import IntegrityError class BatchCopyTool(toolbase.SrcDstTablesTool): @@ -10,7 +11,7 @@ self.parser.add_argument('--table-name', type=str, help='Table to be copied.') self.parser.add_argument('--src-filter', type=str, help='WHERE condition for source query.') self.parser.add_argument('--file-with-ids', type=str, help='Read source IDs from file (each ID on new line). Use these in --src-filter as {ids}') - #TODO: duplicates=rollback|ignore|update + self.parser.add_argument('--dst-exists', choices=['rollback', 'ignore', 'update'], default='rollback', help='What to do when destination record already exists.') self.init() @@ -30,14 +31,27 @@ src_curs.connection.commit() with self.pgm.cursor('dst') as dst_curs: + copied = 0 for row in data: keys = ', '.join(row.keys()) values_mask = ', '.join(['%s'] * len(row)) query = 'INSERT INTO {} ({}) VALUES ({})'.format(self.args.table_name, keys, values_mask) - dst_curs.execute(query, list(row.values())) + try: + dst_curs.execute('SAVEPOINT the_query;') + dst_curs.execute(query, list(row.values())) + dst_curs.execute('RELEASE SAVEPOINT the_query;') + copied += 1 + except IntegrityError: + if self.args.dst_exists == 'rollback': + dst_curs.connection.rollback() + break + elif self.args.dst_exists == 'ignore': + dst_curs.execute('ROLLBACK TO SAVEPOINT the_query;') + elif self.args.dst_exists == 'update': + raise NotImplementedError() dst_curs.connection.commit() - self.log.info('Copied {} rows.' % len(rows)) + self.log.info('Copied %s rows.', copied) tool = BatchCopyTool()