# HG changeset patch # User Radek Brich # Date 1336632141 -7200 # Node ID 98c7809af4152f789a7b05eb01b4bbd7ef4cf609 # Parent bd0beda49bcbb6851d2dbad84506e9a05607a5c2 Add PgDataCopy. Add TableCopyTool.Add SrcDstTablesTool class to toolbase, use in tablecopy, tablediff. diff -r bd0beda49bcb -r 98c7809af415 README --- a/README Wed Mar 28 17:25:18 2012 +0200 +++ b/README Thu May 10 08:42:21 2012 +0200 @@ -32,3 +32,34 @@ Just use PostgreSQL with Python, it's better option in most use cases. + +Tools +===== + +tablecopy.py +------------ + +./tablecapy.py db1 db2 -n + Read all tables and all schema from db1, print table names. Remove -n to copy data to db2. + +./tablecopy.py db1 db2 -s myschema + Copy all tables in "myschema" from db1 to db2. + +./tablecopy.py db1 db2 -s myschema --dst-schema otherschema + Copy all tables in "myschema" from db1 to same tables in "otherschema" in db2. + +./tablecopy.py db1 db2 -s ^my --regex + Copy all tables from all schemas beginning with "my". + +./tablecopy.py db1 db2 -s myschema1 --dst-schema myschema2 --regex -t ^my + Copy all tables beginning with "my" from myschema1 to tables of same name in myschema2. + +Rules: + If nothing is specified, everything is copied. + If no target schema or table is specified, data are copied to schema or table of same name. + If --regex is specified, it applies to table name if specified, schema name otherwise. + If both --regex and --src-table is specified, both source and dest schema can be named (no regex applied to them). + Default schema name is '', which as regex matches all schemas. Same for table name. + Regexes are match with source database, destination database is not checked (table may not exists, will fail when copying). + Directly specified table name (no --regex) is not checked (may not exists). + diff -r bd0beda49bcb -r 98c7809af415 pgtoolkit/pgdatacopy.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pgtoolkit/pgdatacopy.py Thu May 10 08:42:21 2012 +0200 @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- +# +# PgDataCopy - copy data between tables +# +# Copyright (c) 2012 Radek Brich +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + + +import io + + +class TargetNotEmptyError(Exception): + + pass + + +class PgDataCopy: + + def __init__(self, conn1, conn2): + self.conn1 = conn1 + self.conn2 = conn2 + self.fulltable1 = None + self.fulltable2 = None + + def set_source(self, table, schema='public'): + self.schema1 = schema + self.table1 = table + self.fulltable1 = '"' + schema + '"."'+ table + '"' + + def set_destination(self, table, schema='public'): + self.schema2 = schema + self.table2 = table + self.fulltable2 = '"' + schema + '"."'+ table + '"' + + def copy(self): + self.check() + + buf = io.StringIO() + try: + self.read(buf) + data = buf.getvalue() + finally: + buf.close() + + buf = io.StringIO(data) + try: + self.write(buf) + finally: + buf.close() + + self.analyze() + + def check(self): + '''Check that target table does not contain any data (otherwise cannot copy).''' + q = self._compose_check(self.fulltable2) + curs = self.conn2.cursor() + curs.execute(q) + curs.connection.commit() + if curs.rowcount > 0: + raise TargetNotEmptyError('Target table contains data: %s' % self.fulltable2) + self.cols = [desc[0] for desc in curs.description] + + def read(self, tmpfile): + '''Read contents from source table.''' + q = self._compose_read(self.fulltable1, self.cols) + curs = self.conn1.cursor() + curs.copy_expert(q, tmpfile) + curs.connection.commit() + + def write(self, tmpfile): + '''Write source table contents to target table.''' + q = self._compose_write(self.fulltable2, self.cols) + curs = self.conn2.cursor() + curs.copy_expert(q, tmpfile) + curs.connection.commit() + + def analyze(self): + '''Analyze target table.''' + q = self._compose_analyze(self.fulltable2) + curs = self.conn2.cursor() + curs.execute(q) + curs.connection.commit() + + def _compose_check(self, table): + return 'SELECT * FROM %s LIMIT 1' % table + + def _compose_read(self, table, cols): + collist = ', '.join(['"%s"' % col for col in cols]) + return 'COPY %s (%s) TO STDOUT' % (table, collist) + + def _compose_write(self, table, cols): + collist = ', '.join(['"%s"' % col for col in cols]) + return 'COPY %s (%s) FROM STDIN' % (table, collist) + + def _compose_analyze(self, table): + return 'ANALYZE %s' % table + diff -r bd0beda49bcb -r 98c7809af415 pgtoolkit/progresswrapper.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pgtoolkit/progresswrapper.py Thu May 10 08:42:21 2012 +0200 @@ -0,0 +1,54 @@ + +import sys + + +class ProgressWrapper: + + def __init__(self, f, size=0): + self.f = f + self.size = size + self.readsize = 0 + self.cycles = 0 + self.print_cycles = 200 + + def humanize(self, bytes): + if bytes > 1024**3: + return '%.1fG' % (bytes / 1024.**3) + if bytes > 1024**2: + return '%.1fM' % (bytes / 1024.**2) + if bytes > 1024: + return '%.1fk' % (bytes / 1024.) + return '%d' % bytes + + def write(self, data): + self.size += len(data) + if self.cycles == 0: + print(' read %s \r' % self.humanize(self.size), end='') + sys.stdout.flush() + self.cycles = self.print_cycles * 200 + else: + self.cycles -= 1 + return self.f.write(data) + + def read(self, size): + self.readsize += size + if self.cycles == 0: + if self.size > 0: + percent = self.readsize * 100. / self.size + else: + percent = 100 + if percent > 100: + percent = 100 + print(' written %s / %s (%.1f%%) \r' % ( + self.humanize(self.readsize), + self.humanize(self.size), + percent), end='') + sys.stdout.flush() + self.cycles = self.print_cycles + else: + self.cycles -= 1 + return self.f.read(size) + + def close(self): + self.f.close() + diff -r bd0beda49bcb -r 98c7809af415 pgtoolkit/toolbase.py --- a/pgtoolkit/toolbase.py Wed Mar 28 17:25:18 2012 +0200 +++ b/pgtoolkit/toolbase.py Thu May 10 08:42:21 2012 +0200 @@ -1,7 +1,8 @@ import argparse import logging +import re -from pgtoolkit import pgmanager, config +from pgtoolkit import pgmanager, pgbrowser, config from pgtoolkit.coloredformatter import ColoredFormatter from pgtoolkit.highlight import highlight @@ -10,6 +11,10 @@ pass +class BadArgsError(Exception): + pass + + class ToolBase: def __init__(self, name, desc): self.parser = argparse.ArgumentParser(description=desc) @@ -110,3 +115,84 @@ ToolBase.init(self) self.prepare_conns_from_cmdline_args('src', 'dst') + +class SrcDstTablesTool(SrcDstTool): + def __init__(self, name, desc): + SrcDstTool.__init__(self, name, desc) + self.parser.add_argument('-t', '--src-table', metavar='source_table', + dest='srctable', type=str, default='', help='Source table name.') + self.parser.add_argument('-s', '--src-schema', metavar='source_schema', + dest='srcschema', type=str, default='', help='Source schema name (default=public).') + self.parser.add_argument('--dst-table', metavar='destination_table', + dest='dsttable', type=str, default='', help='Destination table name (default=source_table).') + self.parser.add_argument('--dst-schema', metavar='destination_schema', + dest='dstschema', type=str, default='', help='Destination schema name (default=source_schema).') + self.parser.add_argument('--regex', action='store_true', help="Use RE in schema or table name.") + + def init(self): + SrcDstTool.init(self) + + self.schema1 = self.args.srcschema + self.table1 = self.args.srctable + self.schema2 = self.args.dstschema + self.table2 = self.args.dsttable + + # check regex - it applies to source name, dest name must not be specified + # applies to only one - schema or table name + if self.args.regex: + if self.table2 or (self.schema2 and not self.table1): + raise BadArgsError('Cannot specify both --regex and --dst-schema, --dst-table.') + # schema defaults to public + if self.table1 and not self.schema1: + self.schema1 = 'public' + # dest defaults to source + if not self.schema2: + self.schema2 = self.schema1 + if not self.table2: + self.table2 = self.table1 + + def tables(self): + '''Generator. Yields schema1, table1, schema2, table2.''' + srcconn = self.pgm.get_conn('src') + try: + srcbrowser = pgbrowser.PgBrowser(srcconn) + if self.args.regex: + if not self.table1: + # all tables from schemas defined by regex + for item in self._iter_schemas_regex(srcbrowser, self.schema1): + yield item + else: + # all tables defined by regex + for item in self._iter_tables_regex(srcbrowser, self.schema1, self.schema2, self.table1): + yield item + else: + if not self.table1: + if not self.schema1: + # all tables from all schemas + for item in self._iter_schemas_regex(srcbrowser, self.schema1): + yield item + else: + # all tables from specified schema + for item in self._iter_tables_regex(srcbrowser, self.schema1, self.schema2, self.table1): + yield item + else: + # one table + yield (self.schema1, self.table1, self.schema2, self.table2) + finally: + self.pgm.put_conn(srcconn, 'src') + + def _iter_schemas_regex(self, browser, regex): + for schema in browser.list_schemas(): + if schema['system']: + continue + schemaname = schema['name'] + if re.match(regex, schemaname): + for item in self._iter_tables_regex(browser, schemaname, schemaname, ''): + yield item + + def _iter_tables_regex(self, browser, schema1, schema2, regex): + for table in browser.list_tables(schema1): + tablename = table['name'] + if re.match(regex, tablename): + yield (schema1, tablename, schema2, tablename) + diff -r bd0beda49bcb -r 98c7809af415 tablecopy.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tablecopy.py Thu May 10 08:42:21 2012 +0200 @@ -0,0 +1,56 @@ +#!/usr/bin/env python3.2 +# +# Copy data from one table to another table with same schema. +# + +import io + +from pgtoolkit import toolbase, pgmanager, pgdatacopy +from pgtoolkit.progresswrapper import ProgressWrapper + + +class TableCopyTool(toolbase.SrcDstTablesTool): + def __init__(self): + toolbase.SrcDstTablesTool.__init__(self, name='tablecopy', desc='Table copy tool.') + + self.parser.add_argument('-n', '--no-action', dest='noaction', action='store_true', + help="Do nothing, just print tables to be copied. Useful in combination with --regex.") + + self.init() + + def main(self): + srcconn = self.pgm.get_conn('src') + dstconn = self.pgm.get_conn('dst') + + dc = pgdatacopy.PgDataCopy(srcconn, dstconn) + + for srcschema, srctable, dstschema, dsttable in self.tables(): + print('Copying [%s] %s.%s --> [%s] %s.%s' % ( + self.args.src, srcschema, srctable, + self.args.dst, dstschema, dsttable)) + + if self.args.noaction: + continue + + dc.set_source(srctable, srcschema) + dc.set_destination(dsttable, dstschema) + + dc.check() + + buf = io.BytesIO() + wrapped = ProgressWrapper(buf) + dc.read(wrapped) + data = buf.getvalue() + buf.close() + + buf = io.BytesIO(data) + wrapped = ProgressWrapper(buf, len(data)) + dc.write(wrapped) + buf.close() + + dc.analyze() + + +tool = TableCopyTool() +tool.main() + diff -r bd0beda49bcb -r 98c7809af415 tablediff.py --- a/tablediff.py Wed Mar 28 17:25:18 2012 +0200 +++ b/tablediff.py Thu May 10 08:42:21 2012 +0200 @@ -8,40 +8,48 @@ # Order is not important. # -from pgtoolkit import pgmanager, pgbrowser, pgdatadiff, toolbase +from pgtoolkit import toolbase, pgmanager, pgdatadiff +from pgtoolkit.highlight import * -class TableDiffTool(toolbase.SrcDstTool): +class TableDiffTool(toolbase.SrcDstTablesTool): def __init__(self): - toolbase.SrcDstTool.__init__(self, name='tablediff', desc='Table diff.') + toolbase.SrcDstTablesTool.__init__(self, name='tablediff', desc='Table diff.') - self.parser.add_argument('srctable', metavar='srctable', - type=str, help='Source table name.') - self.parser.add_argument('--dst-table', dest='dsttable', metavar='dsttable', - type=str, default=None, help='Destination table (default=srctable).') - self.parser.add_argument('-s', '--src-schema', dest='srcschema', metavar='srcschema', - type=str, default='public', help='Schema name (default=public).') - self.parser.add_argument('--dst-schema', dest='dstschema', metavar='dstschema', - type=str, default=None, help='Destination schema name (default=srcschema).') self.parser.add_argument('--sql', action='store_true', help='Output is SQL script.') + self.parser.add_argument('--rowcount', action='store_true', help='Compare number of rows.') self.init() def main(self): - srcschema = self.args.srcschema - dstschema = self.args.dstschema if self.args.dstschema else self.args.srcschema + srcconn = self.pgm.get_conn('src') + dstconn = self.pgm.get_conn('dst') - srctable = self.args.srctable - dsttable = self.args.dsttable if self.args.dsttable else self.args.srctable + dd = pgdatadiff.PgDataDiff(srcconn, dstconn) - dd = pgdatadiff.PgDataDiff(self.pgm.get_conn('src'), self.pgm.get_conn('dst')) - dd.settable1(srctable, srcschema) - dd.settable2(dsttable, dstschema) - - if self.args.sql: - dd.print_patch() - else: - dd.print_diff() + for srcschema, srctable, dstschema, dsttable in self.tables(): + print('Diff from [%s] %s.%s to [%s] %s.%s' % ( + self.args.src, srcschema, srctable, + self.args.dst, dstschema, dsttable)) + + if self.args.rowcount: + with self.pgm.cursor('src') as curs: + curs.execute('''SELECT count(*) FROM "%s"."%s"''' % (srcschema, srctable)) + srccount = curs.fetchone()[0] + with self.pgm.cursor('dst') as curs: + curs.execute('''SELECT count(*) FROM "%s"."%s"''' % (dstschema, dsttable)) + dstcount = curs.fetchone()[0] + if srccount != dstcount: + print(highlight(1, BOLD | YELLOW), "Row count differs: src=%s dst=%s" % (srccount, dstcount), highlight(0), sep='') + continue + + dd.settable1(srctable, srcschema) + dd.settable2(dsttable, dstschema) + + if self.args.sql: + dd.print_patch() + else: + dd.print_diff() tool = TableDiffTool()