Add PgDataCopy. Add TableCopyTool.Add SrcDstTablesTool class to toolbase, use in tablecopy, tablediff.
authorRadek Brich <radek.brich@devl.cz>
Thu, 10 May 2012 08:42:21 +0200
changeset 34 98c7809af415
parent 33 bd0beda49bcb
child 35 e7f79c4a27ce
Add PgDataCopy. Add TableCopyTool.Add SrcDstTablesTool class to toolbase, use in tablecopy, tablediff.
README
pgtoolkit/pgdatacopy.py
pgtoolkit/progresswrapper.py
pgtoolkit/toolbase.py
tablecopy.py
tablediff.py
--- a/README	Wed Mar 28 17:25:18 2012 +0200
+++ b/README	Thu May 10 08:42:21 2012 +0200
@@ -32,3 +32,34 @@
 
 Just use PostgreSQL with Python, it's better option in most use cases.
 
+
+Tools
+=====
+
+tablecopy.py
+------------
+
+./tablecapy.py db1 db2 -n
+  Read all tables and all schema from db1, print table names. Remove -n to copy data to db2.
+
+./tablecopy.py db1 db2 -s myschema
+  Copy all tables in "myschema" from db1 to db2.
+
+./tablecopy.py db1 db2 -s myschema --dst-schema otherschema
+  Copy all tables in "myschema" from db1 to same tables in "otherschema" in db2.
+
+./tablecopy.py db1 db2 -s ^my --regex
+  Copy all tables from all schemas beginning with "my".
+
+./tablecopy.py db1 db2 -s myschema1 --dst-schema myschema2 --regex -t ^my
+  Copy all tables beginning with "my" from myschema1 to tables of same name in myschema2.
+
+Rules:
+  If nothing is specified, everything is copied.
+  If no target schema or table is specified, data are copied to schema or table of same name.
+  If --regex is specified, it applies to table name if specified, schema name otherwise.
+  If both --regex and --src-table is specified, both source and dest schema can be named (no regex applied to them).
+  Default schema name is '', which as regex matches all schemas. Same for table name.
+  Regexes are match with source database, destination database is not checked (table may not exists, will fail when copying).
+  Directly specified table name (no --regex) is not checked (may not exists).
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pgtoolkit/pgdatacopy.py	Thu May 10 08:42:21 2012 +0200
@@ -0,0 +1,114 @@
+# -*- coding: utf-8 -*-
+#
+# PgDataCopy - copy data between tables
+#
+# Copyright (c) 2012  Radek Brich <radek.brich@devl.cz>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+
+import io
+
+
+class TargetNotEmptyError(Exception):
+    
+    pass
+
+
+class PgDataCopy:
+    
+    def __init__(self, conn1, conn2):
+        self.conn1 = conn1
+        self.conn2 = conn2
+        self.fulltable1 = None
+        self.fulltable2 = None
+   
+    def set_source(self, table, schema='public'):
+        self.schema1 = schema
+        self.table1 = table
+        self.fulltable1 = '"' + schema + '"."'+ table + '"'
+    
+    def set_destination(self, table, schema='public'):
+        self.schema2 = schema
+        self.table2 = table
+        self.fulltable2 = '"' + schema + '"."'+ table + '"'
+    
+    def copy(self):
+        self.check()
+        
+        buf = io.StringIO()
+        try:
+            self.read(buf)
+            data = buf.getvalue()
+        finally:
+            buf.close()
+        
+        buf = io.StringIO(data)
+        try:
+            self.write(buf)
+        finally:
+            buf.close()
+        
+        self.analyze()
+    
+    def check(self):
+        '''Check that target table does not contain any data (otherwise cannot copy).'''
+        q = self._compose_check(self.fulltable2)
+        curs = self.conn2.cursor()
+        curs.execute(q)
+        curs.connection.commit()
+        if curs.rowcount > 0:
+            raise TargetNotEmptyError('Target table contains data: %s' % self.fulltable2)
+        self.cols = [desc[0] for desc in curs.description]
+    
+    def read(self, tmpfile):
+        '''Read contents from source table.'''
+        q = self._compose_read(self.fulltable1, self.cols)
+        curs = self.conn1.cursor()
+        curs.copy_expert(q, tmpfile)
+        curs.connection.commit()
+    
+    def write(self, tmpfile):
+        '''Write source table contents to target table.'''
+        q = self._compose_write(self.fulltable2, self.cols)
+        curs = self.conn2.cursor()
+        curs.copy_expert(q, tmpfile)
+        curs.connection.commit()
+    
+    def analyze(self):
+        '''Analyze target table.'''
+        q = self._compose_analyze(self.fulltable2)
+        curs = self.conn2.cursor()
+        curs.execute(q)
+        curs.connection.commit()
+    
+    def _compose_check(self, table):
+        return 'SELECT * FROM %s LIMIT 1' % table
+    
+    def _compose_read(self, table, cols):
+        collist = ', '.join(['"%s"' % col for col in cols])
+        return 'COPY %s (%s) TO STDOUT' % (table, collist)
+    
+    def _compose_write(self, table, cols):
+        collist = ', '.join(['"%s"' % col for col in cols])
+        return 'COPY %s (%s) FROM STDIN' % (table, collist)
+    
+    def _compose_analyze(self, table):
+        return 'ANALYZE %s' % table
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pgtoolkit/progresswrapper.py	Thu May 10 08:42:21 2012 +0200
@@ -0,0 +1,54 @@
+
+import sys
+
+
+class ProgressWrapper:
+    
+    def __init__(self, f, size=0):
+        self.f = f
+        self.size = size
+        self.readsize = 0
+        self.cycles = 0
+        self.print_cycles = 200
+
+    def humanize(self, bytes):
+        if bytes > 1024**3:
+            return '%.1fG' % (bytes / 1024.**3)
+        if bytes > 1024**2:
+            return '%.1fM' % (bytes / 1024.**2)
+        if bytes > 1024:
+            return '%.1fk' % (bytes / 1024.)
+        return '%d' % bytes
+
+    def write(self, data):
+        self.size += len(data)
+        if self.cycles == 0:
+            print('  read %s      \r' % self.humanize(self.size), end='')
+            sys.stdout.flush()
+            self.cycles = self.print_cycles * 200
+        else:
+            self.cycles -= 1
+        return self.f.write(data)
+
+    def read(self, size):
+        self.readsize += size
+        if self.cycles == 0:
+            if self.size > 0:
+                percent = self.readsize * 100. / self.size
+            else:
+                percent = 100
+            if percent > 100:
+                percent = 100
+            print('  written %s / %s (%.1f%%)      \r' % (
+                self.humanize(self.readsize),
+                self.humanize(self.size),
+                percent), end='')
+            sys.stdout.flush()
+            self.cycles = self.print_cycles
+        else:
+            self.cycles -= 1
+        return self.f.read(size)
+
+    def close(self):
+        self.f.close()
+
--- a/pgtoolkit/toolbase.py	Wed Mar 28 17:25:18 2012 +0200
+++ b/pgtoolkit/toolbase.py	Thu May 10 08:42:21 2012 +0200
@@ -1,7 +1,8 @@
 import argparse
 import logging
+import re
 
-from pgtoolkit import pgmanager, config
+from pgtoolkit import pgmanager, pgbrowser, config
 from pgtoolkit.coloredformatter import ColoredFormatter
 from pgtoolkit.highlight import highlight
 
@@ -10,6 +11,10 @@
     pass
 
 
+class BadArgsError(Exception):    
+    pass
+
+
 class ToolBase:
     def __init__(self, name, desc):
         self.parser = argparse.ArgumentParser(description=desc)
@@ -110,3 +115,84 @@
         ToolBase.init(self)
         self.prepare_conns_from_cmdline_args('src', 'dst')
 
+
+class SrcDstTablesTool(SrcDstTool):
+    def __init__(self, name, desc):
+        SrcDstTool.__init__(self, name, desc)
+        self.parser.add_argument('-t', '--src-table', metavar='source_table',
+            dest='srctable', type=str, default='', help='Source table name.')
+        self.parser.add_argument('-s', '--src-schema', metavar='source_schema',
+            dest='srcschema', type=str, default='', help='Source schema name (default=public).')
+        self.parser.add_argument('--dst-table', metavar='destination_table',
+            dest='dsttable', type=str, default='', help='Destination table name (default=source_table).')
+        self.parser.add_argument('--dst-schema', metavar='destination_schema',
+            dest='dstschema', type=str, default='', help='Destination schema name (default=source_schema).')
+        self.parser.add_argument('--regex', action='store_true', help="Use RE in schema or table name.")
+    
+    def init(self):
+        SrcDstTool.init(self)
+        
+        self.schema1 = self.args.srcschema
+        self.table1 = self.args.srctable
+        self.schema2 = self.args.dstschema
+        self.table2 = self.args.dsttable
+        
+        # check regex - it applies to source name, dest name must not be specified
+        # applies to only one - schema or table name
+        if self.args.regex:
+            if self.table2 or (self.schema2 and not self.table1):
+                raise BadArgsError('Cannot specify both --regex and --dst-schema, --dst-table.')
+        # schema defaults to public
+        if self.table1 and not self.schema1:
+            self.schema1 = 'public'
+        # dest defaults to source
+        if not self.schema2:
+            self.schema2 = self.schema1
+        if not self.table2:
+            self.table2 = self.table1
+
+    def tables(self):
+        '''Generator. Yields schema1, table1, schema2, table2.'''
+        srcconn = self.pgm.get_conn('src')
+        try:
+            srcbrowser = pgbrowser.PgBrowser(srcconn)
+            if self.args.regex:
+                if not self.table1:
+                    # all tables from schemas defined by regex
+                    for item in self._iter_schemas_regex(srcbrowser, self.schema1):
+                        yield item
+                else:
+                    # all tables defined by regex
+                    for item in self._iter_tables_regex(srcbrowser, self.schema1, self.schema2, self.table1):
+                        yield item
+            else:
+                if not self.table1:
+                    if not self.schema1:
+                        # all tables from all schemas
+                        for item in self._iter_schemas_regex(srcbrowser, self.schema1):
+                            yield item
+                    else:
+                        # all tables from specified schema
+                        for item in self._iter_tables_regex(srcbrowser, self.schema1, self.schema2, self.table1):
+                            yield item
+                else:
+                    # one table
+                    yield (self.schema1, self.table1, self.schema2, self.table2)
+        finally:
+            self.pgm.put_conn(srcconn, 'src')
+    
+    def _iter_schemas_regex(self, browser, regex):
+        for schema in browser.list_schemas():
+            if schema['system']:
+                continue
+            schemaname = schema['name']
+            if re.match(regex, schemaname):
+                for item in self._iter_tables_regex(browser, schemaname, schemaname, ''):
+                    yield item
+    
+    def _iter_tables_regex(self, browser, schema1, schema2, regex):
+        for table in browser.list_tables(schema1):
+            tablename = table['name']
+            if re.match(regex, tablename):
+                yield (schema1, tablename, schema2, tablename)
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tablecopy.py	Thu May 10 08:42:21 2012 +0200
@@ -0,0 +1,56 @@
+#!/usr/bin/env python3.2
+#
+# Copy data from one table to another table with same schema.
+#
+
+import io
+
+from pgtoolkit import toolbase, pgmanager, pgdatacopy
+from pgtoolkit.progresswrapper import ProgressWrapper
+
+
+class TableCopyTool(toolbase.SrcDstTablesTool):
+    def __init__(self):
+        toolbase.SrcDstTablesTool.__init__(self, name='tablecopy', desc='Table copy tool.')
+        
+        self.parser.add_argument('-n', '--no-action', dest='noaction', action='store_true',
+            help="Do nothing, just print tables to be copied. Useful in combination with --regex.")
+        
+        self.init()
+
+    def main(self):
+        srcconn = self.pgm.get_conn('src')
+        dstconn = self.pgm.get_conn('dst')
+        
+        dc = pgdatacopy.PgDataCopy(srcconn, dstconn)
+        
+        for srcschema, srctable, dstschema, dsttable in self.tables():
+            print('Copying [%s] %s.%s --> [%s] %s.%s' % (
+                self.args.src, srcschema, srctable,
+                self.args.dst, dstschema, dsttable))
+            
+            if self.args.noaction:
+                continue
+            
+            dc.set_source(srctable, srcschema)
+            dc.set_destination(dsttable, dstschema)
+        
+            dc.check()
+        
+            buf = io.BytesIO()
+            wrapped = ProgressWrapper(buf)
+            dc.read(wrapped)
+            data = buf.getvalue()
+            buf.close()
+            
+            buf = io.BytesIO(data)
+            wrapped = ProgressWrapper(buf, len(data))
+            dc.write(wrapped)
+            buf.close()
+        
+            dc.analyze()
+
+
+tool = TableCopyTool()
+tool.main()
+
--- a/tablediff.py	Wed Mar 28 17:25:18 2012 +0200
+++ b/tablediff.py	Thu May 10 08:42:21 2012 +0200
@@ -8,40 +8,48 @@
 #    Order is not important.
 #
 
-from pgtoolkit import pgmanager, pgbrowser, pgdatadiff, toolbase
+from pgtoolkit import toolbase, pgmanager, pgdatadiff
+from pgtoolkit.highlight import *
 
 
-class TableDiffTool(toolbase.SrcDstTool):
+class TableDiffTool(toolbase.SrcDstTablesTool):
     def __init__(self):
-        toolbase.SrcDstTool.__init__(self, name='tablediff', desc='Table diff.')
+        toolbase.SrcDstTablesTool.__init__(self, name='tablediff', desc='Table diff.')
         
-        self.parser.add_argument('srctable', metavar='srctable',
-            type=str, help='Source table name.')
-        self.parser.add_argument('--dst-table', dest='dsttable', metavar='dsttable',
-            type=str, default=None, help='Destination table (default=srctable).')
-        self.parser.add_argument('-s', '--src-schema', dest='srcschema', metavar='srcschema',
-            type=str, default='public', help='Schema name (default=public).')
-        self.parser.add_argument('--dst-schema', dest='dstschema', metavar='dstschema',
-            type=str, default=None, help='Destination schema name (default=srcschema).')
         self.parser.add_argument('--sql', action='store_true', help='Output is SQL script.')
+        self.parser.add_argument('--rowcount', action='store_true', help='Compare number of rows.')
         
         self.init()
 
     def main(self):
-        srcschema = self.args.srcschema
-        dstschema = self.args.dstschema if self.args.dstschema else self.args.srcschema
+        srcconn = self.pgm.get_conn('src')
+        dstconn = self.pgm.get_conn('dst')
         
-        srctable = self.args.srctable
-        dsttable = self.args.dsttable if self.args.dsttable else self.args.srctable
+        dd = pgdatadiff.PgDataDiff(srcconn, dstconn)
         
-        dd = pgdatadiff.PgDataDiff(self.pgm.get_conn('src'), self.pgm.get_conn('dst'))
-        dd.settable1(srctable, srcschema)
-        dd.settable2(dsttable, dstschema)
-        
-        if self.args.sql:
-            dd.print_patch()
-        else:
-            dd.print_diff()
+        for srcschema, srctable, dstschema, dsttable in self.tables():
+            print('Diff from [%s] %s.%s to [%s] %s.%s' % (
+                self.args.src, srcschema, srctable,
+                self.args.dst, dstschema, dsttable))
+            
+            if self.args.rowcount:
+                with self.pgm.cursor('src') as curs:
+                    curs.execute('''SELECT count(*) FROM "%s"."%s"''' % (srcschema, srctable))
+                    srccount = curs.fetchone()[0]
+                with self.pgm.cursor('dst') as curs:
+                    curs.execute('''SELECT count(*) FROM "%s"."%s"''' % (dstschema, dsttable))
+                    dstcount = curs.fetchone()[0]
+                if srccount != dstcount:
+                    print(highlight(1, BOLD | YELLOW), "Row count differs: src=%s dst=%s" % (srccount, dstcount), highlight(0), sep='')
+                continue
+            
+            dd.settable1(srctable, srcschema)
+            dd.settable2(dsttable, dstschema)
+            
+            if self.args.sql:
+                dd.print_patch()
+            else:
+                dd.print_diff()
 
 
 tool = TableDiffTool()