Update runquery: Add parameter to read queries from file.
import argparse
import logging
import re
from pycolib.configparser import ConfigParser
from pycolib.coloredformatter import ColoredFormatter
from pycolib.ansicolor import highlight
from pgtoolkit import pgmanager, pgbrowser
class ConnectionInfoNotFound(Exception):
pass
class BadArgsError(Exception):
pass
class ToolBase:
def __init__(self, name, desc, **kwargs):
self.parser = argparse.ArgumentParser(description=desc)
self.parser.add_argument('-d', dest='debug', action='store_true',
help='Debug mode - print database queries.')
self.config = ConfigParser()
self.config.add_option('databases', dict)
self.config.add_option('meta_db')
self.config.add_option('meta_query')
self.pgm = pgmanager.get_instance()
self.target_isolation_level = None
def init(self):
self.config.load('pgtoolkit.conf')
self.args = self.parser.parse_args()
self.init_logging()
def init_logging(self):
# logging
format = ColoredFormatter(highlight(1,7,0)+'%(asctime)s %(levelname)-5s'+highlight(0)+' %(message)s', '%H:%M:%S')
handler = logging.StreamHandler()
handler.setFormatter(format)
handler.setLevel(logging.DEBUG)
self.log = logging.getLogger('main')
self.log.addHandler(handler)
self.log.setLevel(logging.DEBUG)
log_notices = logging.getLogger('pgmanager_notices')
log_notices.addHandler(handler)
log_notices.setLevel(logging.DEBUG)
if self.args.debug:
log_sql = logging.getLogger('pgmanager_sql')
log_sql.addHandler(handler)
log_sql.setLevel(logging.DEBUG)
def prepare_conn_from_metadb(self, name, lookup_name):
'''Create connection in pgmanager using meta DB.
name -- Name for connection in pgmanager.
lookup_name -- Name of connection in meta DB.
'''
with self.pgm.cursor('meta') as curs:
curs.execute(self.config.meta_query, [lookup_name])
row = curs.fetchone_dict()
curs.connection.commit()
if row:
self.pgm.create_conn(name=name,
isolation_level=self.target_isolation_level,
**row)
return True
def prepare_conn_from_config(self, name, lookup_name):
'''Create connection in pgmanager using info in config.databases.'''
if self.config.databases:
if lookup_name in self.config.databases:
dsn = self.config.databases[lookup_name]
self.pgm.create_conn(name=name,
isolation_level=self.target_isolation_level,
dsn=dsn)
return True
def prepare_conns(self, **kwargs):
"""Create connections in PgManager.
Keyword arguments meaning:
key: connection name for use in PgManager
value: connection name in config or meta DB
"""
if self.config.meta_db:
self.pgm.create_conn(name='meta', dsn=self.config.meta_db)
for name in kwargs:
lookup_name = kwargs[name]
found = self.prepare_conn_from_config(name, lookup_name)
if not found and self.config.meta_db:
found = self.prepare_conn_from_metadb(name, lookup_name)
if not found:
raise ConnectionInfoNotFound('Connection name "%s" not found in config nor in meta DB.' % lookup_name)
if self.config.meta_db:
self.pgm.close_conn('meta')
class SimpleTool(ToolBase):
def __init__(self, name, desc, **kwargs):
ToolBase.__init__(self, name, desc, **kwargs)
self.parser.add_argument('target', metavar='target', type=str, help='Target database')
def init(self):
ToolBase.init(self)
self.prepare_conns(target=self.args.target)
class SrcDstTool(ToolBase):
def __init__(self, name, desc, **kwargs):
ToolBase.__init__(self, name, desc, **kwargs)
self.parser.add_argument('src', metavar='source', type=str, help='Source database')
self.parser.add_argument('dst', metavar='destination', type=str, help='Destination database')
if 'allow_reverse' in kwargs and kwargs['allow_reverse']:
self.parser.add_argument('-r', '--reverse', action='store_true', help='Reverse operation. Swap source and destination.')
def init(self):
ToolBase.init(self)
if self.is_reversed():
self.args.src, self.args.dst = self.args.dst, self.args.src
self.prepare_conns(src=self.args.src, dst=self.args.dst)
def is_reversed(self):
return 'reverse' in self.args and self.args.reverse
class SrcDstTablesTool(SrcDstTool):
def __init__(self, name, desc, **kwargs):
SrcDstTool.__init__(self, name, desc, **kwargs)
self.parser.add_argument('-t', '--src-table', metavar='source_table',
dest='srctable', type=str, default='', help='Source table name.')
self.parser.add_argument('-s', '--src-schema', metavar='source_schema',
dest='srcschema', type=str, default='', help='Source schema name (default=public).')
self.parser.add_argument('--dst-table', metavar='destination_table',
dest='dsttable', type=str, default='', help='Destination table name (default=source_table).')
self.parser.add_argument('--dst-schema', metavar='destination_schema',
dest='dstschema', type=str, default='', help='Destination schema name (default=source_schema).')
self.parser.add_argument('--regex', action='store_true', help="Use RE in schema or table name.")
def init(self):
SrcDstTool.init(self)
self.schema1 = self.args.srcschema
self.table1 = self.args.srctable
self.schema2 = self.args.dstschema
self.table2 = self.args.dsttable
# check regex - it applies to source name, dest name must not be specified
# applies to only one - schema or table name
if self.args.regex:
if self.table2 or (self.schema2 and not self.table1):
raise BadArgsError('Cannot specify both --regex and --dst-schema, --dst-table.')
# schema defaults to public
if self.table1 and not self.schema1:
self.schema1 = 'public'
# dest defaults to source
if not self.schema2:
self.schema2 = self.schema1
if not self.table2:
self.table2 = self.table1
# swap src, dst when in reverse mode
if self.is_reversed():
self.schema1, self.schema2 = self.schema2, self.schema1
self.table1, self.table2 = self.table2, self.table1
def tables(self):
'''Generator. Yields schema1, table1, schema2, table2.'''
srcconn = self.pgm.get_conn('src')
try:
srcbrowser = pgbrowser.PgBrowser(srcconn)
if self.args.regex:
if not self.table1:
# all tables from schemas defined by regex
for item in self._iter_schemas_regex(srcbrowser, self.schema1):
yield item
else:
# all tables defined by regex
for item in self._iter_tables_regex(srcbrowser, self.schema1, self.schema2, self.table1):
yield item
else:
if not self.table1:
if not self.schema1:
# all tables from all schemas
for item in self._iter_schemas_regex(srcbrowser, self.schema1):
yield item
else:
# all tables from specified schema
for item in self._iter_tables_regex(srcbrowser, self.schema1, self.schema2, self.table1):
yield item
else:
# one table
yield (self.schema1, self.table1, self.schema2, self.table2)
finally:
self.pgm.put_conn(srcconn, 'src')
def _iter_schemas_regex(self, browser, regex):
for schema in browser.list_schemas():
if schema['system']:
continue
schemaname = schema['name']
if re.match(regex, schemaname):
for item in self._iter_tables_regex(browser, schemaname, schemaname, ''):
yield item
def _iter_tables_regex(self, browser, schema1, schema2, regex):
for table in browser.list_tables(schema1):
tablename = table['name']
if re.match(regex, tablename):
yield (schema1, tablename, schema2, tablename)