Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
authorRadek Brich <brich.radek@ifortuna.cz>
Tue, 29 Apr 2014 17:50:15 +0200
changeset 98 024299702087
parent 97 a4af93e72e2b
child 99 245646538743
Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
batchcopy.py
--- a/batchcopy.py	Mon Apr 14 22:28:12 2014 +0200
+++ b/batchcopy.py	Tue Apr 29 17:50:15 2014 +0200
@@ -1,6 +1,7 @@
 #!/usr/bin/env python3
 
 from pgtoolkit import toolbase
+from pgtoolkit.pgmanager import IntegrityError
 
 
 class BatchCopyTool(toolbase.SrcDstTablesTool):
@@ -10,7 +11,7 @@
         self.parser.add_argument('--table-name', type=str, help='Table to be copied.')
         self.parser.add_argument('--src-filter', type=str, help='WHERE condition for source query.')
         self.parser.add_argument('--file-with-ids', type=str, help='Read source IDs from file (each ID on new line). Use these in --src-filter as {ids}')
-        #TODO: duplicates=rollback|ignore|update
+        self.parser.add_argument('--dst-exists', choices=['rollback', 'ignore', 'update'], default='rollback', help='What to do when destination record already exists.')
 
         self.init()
 
@@ -30,14 +31,27 @@
             src_curs.connection.commit()
 
         with self.pgm.cursor('dst') as dst_curs:
+            copied = 0
             for row in data:
                 keys = ', '.join(row.keys())
                 values_mask = ', '.join(['%s'] * len(row))
                 query = 'INSERT INTO {} ({}) VALUES ({})'.format(self.args.table_name, keys, values_mask)
-                dst_curs.execute(query, list(row.values()))
+                try:
+                    dst_curs.execute('SAVEPOINT the_query;')
+                    dst_curs.execute(query, list(row.values()))
+                    dst_curs.execute('RELEASE SAVEPOINT the_query;')
+                    copied += 1
+                except IntegrityError:
+                    if self.args.dst_exists == 'rollback':
+                        dst_curs.connection.rollback()
+                        break
+                    elif self.args.dst_exists == 'ignore':
+                        dst_curs.execute('ROLLBACK TO SAVEPOINT the_query;')
+                    elif self.args.dst_exists == 'update':
+                        raise NotImplementedError()
             dst_curs.connection.commit()
 
-        self.log.info('Copied {} rows.' % len(rows))
+        self.log.info('Copied %s rows.', copied)
 
 
 tool = BatchCopyTool()