pgtoolkit/tools/batchcopy.py
author Radek Brich <radek.brich@devl.cz>
Mon, 26 May 2014 18:18:21 +0200
changeset 103 24e94a3da209
parent 101 2a2d0d5df03b
permissions -rw-r--r--
Update bigtables tool: Sort by size with indexes, not just data.
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
101
2a2d0d5df03b Refactor ToolBase to allow tool composition. Add TableSync tool (composited). Move more tools under pgtool.
Radek Brich <brich.radek@ifortuna.cz>
parents: 98
diff changeset
     1
from pgtoolkit.toolbase import SrcDstTablesTool
98
024299702087 Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
Radek Brich <brich.radek@ifortuna.cz>
parents: 97
diff changeset
     2
from pgtoolkit.pgmanager import IntegrityError
97
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
     3
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
     4
101
2a2d0d5df03b Refactor ToolBase to allow tool composition. Add TableSync tool (composited). Move more tools under pgtool.
Radek Brich <brich.radek@ifortuna.cz>
parents: 98
diff changeset
     5
class BatchCopyTool(SrcDstTablesTool):
2a2d0d5df03b Refactor ToolBase to allow tool composition. Add TableSync tool (composited). Move more tools under pgtool.
Radek Brich <brich.radek@ifortuna.cz>
parents: 98
diff changeset
     6
2a2d0d5df03b Refactor ToolBase to allow tool composition. Add TableSync tool (composited). Move more tools under pgtool.
Radek Brich <brich.radek@ifortuna.cz>
parents: 98
diff changeset
     7
    """
2a2d0d5df03b Refactor ToolBase to allow tool composition. Add TableSync tool (composited). Move more tools under pgtool.
Radek Brich <brich.radek@ifortuna.cz>
parents: 98
diff changeset
     8
    Copy data from one table to another, filtering by specified condition.
2a2d0d5df03b Refactor ToolBase to allow tool composition. Add TableSync tool (composited). Move more tools under pgtool.
Radek Brich <brich.radek@ifortuna.cz>
parents: 98
diff changeset
     9
2a2d0d5df03b Refactor ToolBase to allow tool composition. Add TableSync tool (composited). Move more tools under pgtool.
Radek Brich <brich.radek@ifortuna.cz>
parents: 98
diff changeset
    10
    """
2a2d0d5df03b Refactor ToolBase to allow tool composition. Add TableSync tool (composited). Move more tools under pgtool.
Radek Brich <brich.radek@ifortuna.cz>
parents: 98
diff changeset
    11
97
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    12
    def __init__(self):
101
2a2d0d5df03b Refactor ToolBase to allow tool composition. Add TableSync tool (composited). Move more tools under pgtool.
Radek Brich <brich.radek@ifortuna.cz>
parents: 98
diff changeset
    13
        SrcDstTablesTool.__init__(self, name='batchcopy', desc='')
97
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    14
101
2a2d0d5df03b Refactor ToolBase to allow tool composition. Add TableSync tool (composited). Move more tools under pgtool.
Radek Brich <brich.radek@ifortuna.cz>
parents: 98
diff changeset
    15
    def specify_args(self):
2a2d0d5df03b Refactor ToolBase to allow tool composition. Add TableSync tool (composited). Move more tools under pgtool.
Radek Brich <brich.radek@ifortuna.cz>
parents: 98
diff changeset
    16
        SrcDstTablesTool.specify_args(self)
97
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    17
        self.parser.add_argument('--table-name', type=str, help='Table to be copied.')
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    18
        self.parser.add_argument('--src-filter', type=str, help='WHERE condition for source query.')
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    19
        self.parser.add_argument('--file-with-ids', type=str, help='Read source IDs from file (each ID on new line). Use these in --src-filter as {ids}')
98
024299702087 Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
Radek Brich <brich.radek@ifortuna.cz>
parents: 97
diff changeset
    20
        self.parser.add_argument('--dst-exists', choices=['rollback', 'ignore', 'update'], default='rollback', help='What to do when destination record already exists.')
97
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    21
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    22
    def main(self):
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    23
        # read list of IDs from file
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    24
        ids = '<no IDs read>'
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    25
        if self.args.file_with_ids:
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    26
            with open(self.args.file_with_ids, 'r') as f:
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    27
                ids = ','.join(ln.rstrip() for ln in f.readlines())
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    28
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    29
        # read source data
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    30
        with self.pgm.cursor('src') as src_curs:
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    31
            condition = self.args.src_filter.format(ids=ids) or 'true'
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    32
            src_curs.execute('SELECT * FROM {} WHERE {}'.format(self.args.table_name, condition))
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    33
            #TODO:  ORDER BY id OFFSET 0 LIMIT 100
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    34
            data = src_curs.fetchall_dict()
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    35
            src_curs.connection.commit()
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    36
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    37
        with self.pgm.cursor('dst') as dst_curs:
98
024299702087 Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
Radek Brich <brich.radek@ifortuna.cz>
parents: 97
diff changeset
    38
            copied = 0
97
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    39
            for row in data:
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    40
                keys = ', '.join(row.keys())
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    41
                values_mask = ', '.join(['%s'] * len(row))
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    42
                query = 'INSERT INTO {} ({}) VALUES ({})'.format(self.args.table_name, keys, values_mask)
98
024299702087 Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
Radek Brich <brich.radek@ifortuna.cz>
parents: 97
diff changeset
    43
                try:
024299702087 Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
Radek Brich <brich.radek@ifortuna.cz>
parents: 97
diff changeset
    44
                    dst_curs.execute('SAVEPOINT the_query;')
024299702087 Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
Radek Brich <brich.radek@ifortuna.cz>
parents: 97
diff changeset
    45
                    dst_curs.execute(query, list(row.values()))
024299702087 Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
Radek Brich <brich.radek@ifortuna.cz>
parents: 97
diff changeset
    46
                    dst_curs.execute('RELEASE SAVEPOINT the_query;')
024299702087 Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
Radek Brich <brich.radek@ifortuna.cz>
parents: 97
diff changeset
    47
                    copied += 1
024299702087 Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
Radek Brich <brich.radek@ifortuna.cz>
parents: 97
diff changeset
    48
                except IntegrityError:
024299702087 Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
Radek Brich <brich.radek@ifortuna.cz>
parents: 97
diff changeset
    49
                    if self.args.dst_exists == 'rollback':
024299702087 Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
Radek Brich <brich.radek@ifortuna.cz>
parents: 97
diff changeset
    50
                        dst_curs.connection.rollback()
024299702087 Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
Radek Brich <brich.radek@ifortuna.cz>
parents: 97
diff changeset
    51
                        break
024299702087 Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
Radek Brich <brich.radek@ifortuna.cz>
parents: 97
diff changeset
    52
                    elif self.args.dst_exists == 'ignore':
024299702087 Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
Radek Brich <brich.radek@ifortuna.cz>
parents: 97
diff changeset
    53
                        dst_curs.execute('ROLLBACK TO SAVEPOINT the_query;')
024299702087 Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
Radek Brich <brich.radek@ifortuna.cz>
parents: 97
diff changeset
    54
                    elif self.args.dst_exists == 'update':
024299702087 Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
Radek Brich <brich.radek@ifortuna.cz>
parents: 97
diff changeset
    55
                        raise NotImplementedError()
97
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    56
            dst_curs.connection.commit()
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    57
98
024299702087 Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
Radek Brich <brich.radek@ifortuna.cz>
parents: 97
diff changeset
    58
        self.log.info('Copied %s rows.', copied)
97
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    59
a4af93e72e2b Add batchcopy tool.
Radek Brich <brich.radek@ifortuna.cz>
parents:
diff changeset
    60
101
2a2d0d5df03b Refactor ToolBase to allow tool composition. Add TableSync tool (composited). Move more tools under pgtool.
Radek Brich <brich.radek@ifortuna.cz>
parents: 98
diff changeset
    61
cls = BatchCopyTool