Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
--- a/batchcopy.py Mon Apr 14 22:28:12 2014 +0200
+++ b/batchcopy.py Tue Apr 29 17:50:15 2014 +0200
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
from pgtoolkit import toolbase
+from pgtoolkit.pgmanager import IntegrityError
class BatchCopyTool(toolbase.SrcDstTablesTool):
@@ -10,7 +11,7 @@
self.parser.add_argument('--table-name', type=str, help='Table to be copied.')
self.parser.add_argument('--src-filter', type=str, help='WHERE condition for source query.')
self.parser.add_argument('--file-with-ids', type=str, help='Read source IDs from file (each ID on new line). Use these in --src-filter as {ids}')
- #TODO: duplicates=rollback|ignore|update
+ self.parser.add_argument('--dst-exists', choices=['rollback', 'ignore', 'update'], default='rollback', help='What to do when destination record already exists.')
self.init()
@@ -30,14 +31,27 @@
src_curs.connection.commit()
with self.pgm.cursor('dst') as dst_curs:
+ copied = 0
for row in data:
keys = ', '.join(row.keys())
values_mask = ', '.join(['%s'] * len(row))
query = 'INSERT INTO {} ({}) VALUES ({})'.format(self.args.table_name, keys, values_mask)
- dst_curs.execute(query, list(row.values()))
+ try:
+ dst_curs.execute('SAVEPOINT the_query;')
+ dst_curs.execute(query, list(row.values()))
+ dst_curs.execute('RELEASE SAVEPOINT the_query;')
+ copied += 1
+ except IntegrityError:
+ if self.args.dst_exists == 'rollback':
+ dst_curs.connection.rollback()
+ break
+ elif self.args.dst_exists == 'ignore':
+ dst_curs.execute('ROLLBACK TO SAVEPOINT the_query;')
+ elif self.args.dst_exists == 'update':
+ raise NotImplementedError()
dst_curs.connection.commit()
- self.log.info('Copied {} rows.' % len(rows))
+ self.log.info('Copied %s rows.', copied)
tool = BatchCopyTool()