1 #!/usr/bin/env python3 |
1 #!/usr/bin/env python3 |
2 |
2 |
3 from pgtoolkit import toolbase |
3 from pgtoolkit import toolbase |
|
4 from pgtoolkit.pgmanager import IntegrityError |
4 |
5 |
5 |
6 |
6 class BatchCopyTool(toolbase.SrcDstTablesTool): |
7 class BatchCopyTool(toolbase.SrcDstTablesTool): |
7 def __init__(self): |
8 def __init__(self): |
8 toolbase.SrcDstTablesTool.__init__(self, name='batchcopy', desc='Copy data from one table to another.') |
9 toolbase.SrcDstTablesTool.__init__(self, name='batchcopy', desc='Copy data from one table to another.') |
9 |
10 |
10 self.parser.add_argument('--table-name', type=str, help='Table to be copied.') |
11 self.parser.add_argument('--table-name', type=str, help='Table to be copied.') |
11 self.parser.add_argument('--src-filter', type=str, help='WHERE condition for source query.') |
12 self.parser.add_argument('--src-filter', type=str, help='WHERE condition for source query.') |
12 self.parser.add_argument('--file-with-ids', type=str, help='Read source IDs from file (each ID on new line). Use these in --src-filter as {ids}') |
13 self.parser.add_argument('--file-with-ids', type=str, help='Read source IDs from file (each ID on new line). Use these in --src-filter as {ids}') |
13 #TODO: duplicates=rollback|ignore|update |
14 self.parser.add_argument('--dst-exists', choices=['rollback', 'ignore', 'update'], default='rollback', help='What to do when destination record already exists.') |
14 |
15 |
15 self.init() |
16 self.init() |
16 |
17 |
17 def main(self): |
18 def main(self): |
18 # read list of IDs from file |
19 # read list of IDs from file |
28 #TODO: ORDER BY id OFFSET 0 LIMIT 100 |
29 #TODO: ORDER BY id OFFSET 0 LIMIT 100 |
29 data = src_curs.fetchall_dict() |
30 data = src_curs.fetchall_dict() |
30 src_curs.connection.commit() |
31 src_curs.connection.commit() |
31 |
32 |
32 with self.pgm.cursor('dst') as dst_curs: |
33 with self.pgm.cursor('dst') as dst_curs: |
|
34 copied = 0 |
33 for row in data: |
35 for row in data: |
34 keys = ', '.join(row.keys()) |
36 keys = ', '.join(row.keys()) |
35 values_mask = ', '.join(['%s'] * len(row)) |
37 values_mask = ', '.join(['%s'] * len(row)) |
36 query = 'INSERT INTO {} ({}) VALUES ({})'.format(self.args.table_name, keys, values_mask) |
38 query = 'INSERT INTO {} ({}) VALUES ({})'.format(self.args.table_name, keys, values_mask) |
37 dst_curs.execute(query, list(row.values())) |
39 try: |
|
40 dst_curs.execute('SAVEPOINT the_query;') |
|
41 dst_curs.execute(query, list(row.values())) |
|
42 dst_curs.execute('RELEASE SAVEPOINT the_query;') |
|
43 copied += 1 |
|
44 except IntegrityError: |
|
45 if self.args.dst_exists == 'rollback': |
|
46 dst_curs.connection.rollback() |
|
47 break |
|
48 elif self.args.dst_exists == 'ignore': |
|
49 dst_curs.execute('ROLLBACK TO SAVEPOINT the_query;') |
|
50 elif self.args.dst_exists == 'update': |
|
51 raise NotImplementedError() |
38 dst_curs.connection.commit() |
52 dst_curs.connection.commit() |
39 |
53 |
40 self.log.info('Copied {} rows.' % len(rows)) |
54 self.log.info('Copied %s rows.', copied) |
41 |
55 |
42 |
56 |
43 tool = BatchCopyTool() |
57 tool = BatchCopyTool() |
44 tool.main() |
58 tool.main() |