batchcopy.py
changeset 98 024299702087
parent 97 a4af93e72e2b
equal deleted inserted replaced
97:a4af93e72e2b 98:024299702087
     1 #!/usr/bin/env python3
     1 #!/usr/bin/env python3
     2 
     2 
     3 from pgtoolkit import toolbase
     3 from pgtoolkit import toolbase
       
     4 from pgtoolkit.pgmanager import IntegrityError
     4 
     5 
     5 
     6 
     6 class BatchCopyTool(toolbase.SrcDstTablesTool):
     7 class BatchCopyTool(toolbase.SrcDstTablesTool):
     7     def __init__(self):
     8     def __init__(self):
     8         toolbase.SrcDstTablesTool.__init__(self, name='batchcopy', desc='Copy data from one table to another.')
     9         toolbase.SrcDstTablesTool.__init__(self, name='batchcopy', desc='Copy data from one table to another.')
     9 
    10 
    10         self.parser.add_argument('--table-name', type=str, help='Table to be copied.')
    11         self.parser.add_argument('--table-name', type=str, help='Table to be copied.')
    11         self.parser.add_argument('--src-filter', type=str, help='WHERE condition for source query.')
    12         self.parser.add_argument('--src-filter', type=str, help='WHERE condition for source query.')
    12         self.parser.add_argument('--file-with-ids', type=str, help='Read source IDs from file (each ID on new line). Use these in --src-filter as {ids}')
    13         self.parser.add_argument('--file-with-ids', type=str, help='Read source IDs from file (each ID on new line). Use these in --src-filter as {ids}')
    13         #TODO: duplicates=rollback|ignore|update
    14         self.parser.add_argument('--dst-exists', choices=['rollback', 'ignore', 'update'], default='rollback', help='What to do when destination record already exists.')
    14 
    15 
    15         self.init()
    16         self.init()
    16 
    17 
    17     def main(self):
    18     def main(self):
    18         # read list of IDs from file
    19         # read list of IDs from file
    28             #TODO:  ORDER BY id OFFSET 0 LIMIT 100
    29             #TODO:  ORDER BY id OFFSET 0 LIMIT 100
    29             data = src_curs.fetchall_dict()
    30             data = src_curs.fetchall_dict()
    30             src_curs.connection.commit()
    31             src_curs.connection.commit()
    31 
    32 
    32         with self.pgm.cursor('dst') as dst_curs:
    33         with self.pgm.cursor('dst') as dst_curs:
       
    34             copied = 0
    33             for row in data:
    35             for row in data:
    34                 keys = ', '.join(row.keys())
    36                 keys = ', '.join(row.keys())
    35                 values_mask = ', '.join(['%s'] * len(row))
    37                 values_mask = ', '.join(['%s'] * len(row))
    36                 query = 'INSERT INTO {} ({}) VALUES ({})'.format(self.args.table_name, keys, values_mask)
    38                 query = 'INSERT INTO {} ({}) VALUES ({})'.format(self.args.table_name, keys, values_mask)
    37                 dst_curs.execute(query, list(row.values()))
    39                 try:
       
    40                     dst_curs.execute('SAVEPOINT the_query;')
       
    41                     dst_curs.execute(query, list(row.values()))
       
    42                     dst_curs.execute('RELEASE SAVEPOINT the_query;')
       
    43                     copied += 1
       
    44                 except IntegrityError:
       
    45                     if self.args.dst_exists == 'rollback':
       
    46                         dst_curs.connection.rollback()
       
    47                         break
       
    48                     elif self.args.dst_exists == 'ignore':
       
    49                         dst_curs.execute('ROLLBACK TO SAVEPOINT the_query;')
       
    50                     elif self.args.dst_exists == 'update':
       
    51                         raise NotImplementedError()
    38             dst_curs.connection.commit()
    52             dst_curs.connection.commit()
    39 
    53 
    40         self.log.info('Copied {} rows.' % len(rows))
    54         self.log.info('Copied %s rows.', copied)
    41 
    55 
    42 
    56 
    43 tool = BatchCopyTool()
    57 tool = BatchCopyTool()
    44 tool.main()
    58 tool.main()