Refactor ToolBase to allow tool composition. Add TableSync tool (composited). Move more tools under pgtool.
#!/usr/bin/env python3
#
# Copy data between tables with same table schema.
#
# Copies full table, target table must be empty.
# Can copy multiple tables in one run.
# Sorts the tables according to references.
#
# This may be used instead of dump/restore.
#
import io
from pgtoolkit import toolbase, pgmanager, pgbrowser, pgdatacopy
from pgtoolkit.progresswrapper import ProgressWrapper
class TableCopyTool(toolbase.SrcDstTablesTool):
def __init__(self):
toolbase.SrcDstTablesTool.__init__(self, name='tablecopy', desc='Table copy tool.')
self.parser.add_argument('-n', '--no-action', dest='noaction', action='store_true',
help="Do nothing, just print tables to be copied. Useful in combination with --regex.")
self.parser.add_argument('--no-sort', dest='nosort', action='store_true',
help="Do not sort. By default, tables are sorted by foreign key references.")
self.parser.add_argument('--disable-triggers', dest='notriggers', action='store_true',
help="Disable all triggers for the session.")
self.init()
def main(self):
self.srcconn = self.pgm.get_conn('src')
self.dstconn = self.pgm.get_conn('dst')
if self.args.notriggers:
curs = self.dstconn.cursor()
curs.execute('SET session_replication_role = replica;')
curs.close()
self.dstconn.commit()
dc = pgdatacopy.PgDataCopy(self.srcconn, self.dstconn)
if self.args.nosort:
for table in self.tables():
self.copy_table(dc, *table)
else:
# sort tables with respect to references
details = dict()
pending = set()
references = dict()
# build list of all table to be copied (pending) and references map
for table in self.tables():
srcschema, srctable, dstschema, dsttable = table
name = dstschema + '.' + dsttable
details[name] = table
pending.add(name)
references[name] = self.get_references(dstschema, dsttable)
# copy files with fulfilled references, repeat until all done
while pending:
for name in list(pending):
ok = True
for ref in references[name]:
if ref in pending:
ok = False
if ok:
table = details[name]
self.copy_table(dc, *table)
pending.remove(name)
def get_references(self, schema, table):
browser = pgbrowser.PgBrowser(self.dstconn)
cons = browser.list_constraints(table, schema)
return [con['fschema'] + '.' + con['fname'] for con in cons if con['type'] == 'f']
def copy_table(self, dc, srcschema, srctable, dstschema, dsttable):
print('Copying [%s] %s.%s --> [%s] %s.%s' % (
self.args.src, srcschema, srctable,
self.args.dst, dstschema, dsttable))
if self.args.noaction:
return
dc.set_source(srctable, srcschema)
dc.set_destination(dsttable, dstschema)
try:
dc.check()
except pgdatacopy.TargetNotEmptyError as e:
print(' - error:', str(e))
return
print(' - read ')
buf = io.BytesIO()
wrapped = ProgressWrapper(buf)
dc.read(wrapped)
data = buf.getvalue()
buf.close()
print(' - write ')
buf = io.BytesIO(data)
wrapped = ProgressWrapper(buf, len(data))
dc.write(wrapped)
buf.close()
print(' - analyze ')
dc.analyze()
tool = TableCopyTool()
tool.main()