# HG changeset patch # User Radek Brich # Date 1399394263 -7200 # Node ID 2a2d0d5df03b6683483c961670e7d81d8f34c976 # Parent d6088dba8fea81edb7add7a5caedefa19fb9df5f Refactor ToolBase to allow tool composition. Add TableSync tool (composited). Move more tools under pgtool. diff -r d6088dba8fea -r 2a2d0d5df03b analyzeall.py --- a/analyzeall.py Tue May 06 18:37:41 2014 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,51 +0,0 @@ -#!/usr/bin/env python3 -""" -analyzeall - -Analyze/vacuum all tables in selected schemas. -See also "VACUUM ANALYZE VERBOSE;" query. -Unlike that, this program skips pg_catalog etc. - -""" - -from pgtoolkit import pgbrowser, toolbase - - -class AnalyzeAllTool(toolbase.SimpleTool): - def __init__(self): - toolbase.SimpleTool.__init__(self, name='analyzeall', desc='Analyze all tables.') - self.parser.add_argument('-s', dest='schema', nargs='*', help='Schema filter') - self.parser.add_argument('--vacuum', action='store_true', help='Call VACUUM ANALYZE') - self.parser.add_argument('--vacuum-full', action='store_true', help='Call VACUUM FULL ANALYZE') - self.parser.add_argument('--reindex', action='store_true', help='Call REINDEX TABLE') - self.target_isolation_level = 'autocommit' - self.init() - - def main(self): - browser = pgbrowser.PgBrowser(self.pgm.get_conn('target')) - - query_patterns = ['ANALYZE %s.%s;'] - if self.args.vacuum: - query_patterns = ['VACUUM ANALYZE %s.%s;'] - if self.args.vacuum_full: - query_patterns = ['VACUUM FULL ANALYZE %s.%s;'] - if self.args.reindex: - query_patterns += ['REINDEX TABLE %s.%s;'] - - schema_list = self.args.schema - if not schema_list: - schema_list = [schema['name'] for schema in browser.list_schemas() if not schema['system']] - - for schema in schema_list: - tables = browser.list_tables(schema=schema) - with self.pgm.cursor('target') as curs: - for table in tables: - for query_pattern in query_patterns: - query = query_pattern % (schema, table['name']) - self.log.info(query) - curs.execute(query, []) - - -tool = AnalyzeAllTool() -tool.main() - diff -r d6088dba8fea -r 2a2d0d5df03b batchcopy.py --- a/batchcopy.py Tue May 06 18:37:41 2014 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,58 +0,0 @@ -#!/usr/bin/env python3 - -from pgtoolkit import toolbase -from pgtoolkit.pgmanager import IntegrityError - - -class BatchCopyTool(toolbase.SrcDstTablesTool): - def __init__(self): - toolbase.SrcDstTablesTool.__init__(self, name='batchcopy', desc='Copy data from one table to another.') - - self.parser.add_argument('--table-name', type=str, help='Table to be copied.') - self.parser.add_argument('--src-filter', type=str, help='WHERE condition for source query.') - self.parser.add_argument('--file-with-ids', type=str, help='Read source IDs from file (each ID on new line). Use these in --src-filter as {ids}') - self.parser.add_argument('--dst-exists', choices=['rollback', 'ignore', 'update'], default='rollback', help='What to do when destination record already exists.') - - self.init() - - def main(self): - # read list of IDs from file - ids = '' - if self.args.file_with_ids: - with open(self.args.file_with_ids, 'r') as f: - ids = ','.join(ln.rstrip() for ln in f.readlines()) - - # read source data - with self.pgm.cursor('src') as src_curs: - condition = self.args.src_filter.format(ids=ids) or 'true' - src_curs.execute('SELECT * FROM {} WHERE {}'.format(self.args.table_name, condition)) - #TODO: ORDER BY id OFFSET 0 LIMIT 100 - data = src_curs.fetchall_dict() - src_curs.connection.commit() - - with self.pgm.cursor('dst') as dst_curs: - copied = 0 - for row in data: - keys = ', '.join(row.keys()) - values_mask = ', '.join(['%s'] * len(row)) - query = 'INSERT INTO {} ({}) VALUES ({})'.format(self.args.table_name, keys, values_mask) - try: - dst_curs.execute('SAVEPOINT the_query;') - dst_curs.execute(query, list(row.values())) - dst_curs.execute('RELEASE SAVEPOINT the_query;') - copied += 1 - except IntegrityError: - if self.args.dst_exists == 'rollback': - dst_curs.connection.rollback() - break - elif self.args.dst_exists == 'ignore': - dst_curs.execute('ROLLBACK TO SAVEPOINT the_query;') - elif self.args.dst_exists == 'update': - raise NotImplementedError() - dst_curs.connection.commit() - - self.log.info('Copied %s rows.', copied) - - -tool = BatchCopyTool() -tool.main() diff -r d6088dba8fea -r 2a2d0d5df03b batchquery.py --- a/batchquery.py Tue May 06 18:37:41 2014 +0200 +++ b/batchquery.py Tue May 06 18:37:43 2014 +0200 @@ -12,7 +12,6 @@ self.parser.add_argument('--output', dest='output', type=str, help='File name for results.') self.parser.add_argument('--outputfunc', dest='outputfunc', type=str, help='Python function which will format results (format_row(args, rows)).') self.parser.add_argument('--header', dest='header', action='store_true', help='First line of CSV is header with names for columns. These name can be used in query.') - self.init() def _split_line(self, line): return [x.strip() for x in line.split(',')] @@ -65,5 +64,6 @@ tool = BatchQueryTool() +tool.setup() tool.main() diff -r d6088dba8fea -r 2a2d0d5df03b bigtables.py --- a/bigtables.py Tue May 06 18:37:41 2014 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,49 +0,0 @@ -#!/usr/bin/env python3 - -from pgtoolkit import pgbrowser, toolbase -from pycolib.prettysize import prettysize_short -from pycolib.ansicolor import highlight - - -class BigTablesTool(toolbase.SimpleTool): - def __init__(self): - toolbase.SimpleTool.__init__(self, name='bigtables', desc='List largest tables.') - self.parser.add_argument('-n', '--limit', metavar='NUM', dest='limit', type=int, default=5, help='Show NUM biggest tables.') - self.parser.add_argument('-v', '--details', dest='details', action='store_true', help='Show sizes of data and individual indexes.') - self.init() - - def main(self): - browser = pgbrowser.PgBrowser(self.pgm.get_conn('target')) - - # scan all tables from all shemas, remember names and sizes - all_tables = [] - all_indexes = [] - schemas = browser.list_schemas() - for schema in schemas: - tables = browser.list_tables(schema['name']) - for table in tables: - table_name = '%s.%s' % (schema['name'], table['name']) - indexes = browser.list_indexes(table['name'], schema['name']) - for index in indexes: - all_indexes.append({'name': index['name'], 'table': table_name, 'size': index['size']}) - all_tables.append({'name': table_name, 'size': table['size'], 'indexes': indexes}) - - # print names and sizes of 20 largest tables - for table in sorted(all_tables, reverse=True, key=lambda x: x['size'])[:self.args.limit]: - size_of_indexes = sum(index['size'] for index in table['indexes']) - print(highlight(1) + prettysize_short(table['size'] + size_of_indexes, trailing_zeros=True).rjust(8) + highlight(0), - '(total)'.ljust(8), - highlight(1) + table['name'] + highlight(0), sep=' ') - if self.args.details: - print(prettysize_short(table['size'], trailing_zeros=True).rjust(8), - '(data)'.ljust(8), sep=' ') - for index in sorted(table['indexes'], reverse=True, key=lambda x: x['size']): - print(prettysize_short(index['size'], trailing_zeros=True).rjust(8), - '(index)'.ljust(8), index['name'], sep=' ') - print() - - -if __name__ == '__main__': - tool = BigTablesTool() - tool.main() - diff -r d6088dba8fea -r 2a2d0d5df03b listdepends.py --- a/listdepends.py Tue May 06 18:37:41 2014 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,25 +0,0 @@ -#!/usr/bin/env python3 - -from pgtoolkit import pgbrowser, toolbase - - -class ListDependsTool(toolbase.SimpleTool): - def __init__(self): - toolbase.SimpleTool.__init__(self, name='listdepends', desc='List column dependencies.') - self.parser.add_argument('table', metavar='table', type=str, help='Table name.') - self.parser.add_argument('column', metavar='column', type=str, help='Column name.') - self.parser.add_argument('-s', '--schema', dest='schema', metavar='schema', - type=str, default='public', help='Schema name (default=public).') - self.init() - - def main(self): - browser = pgbrowser.PgBrowser(self.pgm.get_conn('target')) - - objects = browser.list_column_usage(self.args.table, self.args.column, schema=self.args.schema) - for obj in sorted(objects, key=lambda x: (x['type'], x['schema'], x['name'])): - print(obj['type'], ' ', obj['schema'], '.', obj['name'], sep='') - - -tool = ListDependsTool() -tool.main() - diff -r d6088dba8fea -r 2a2d0d5df03b listserial.py --- a/listserial.py Tue May 06 18:37:41 2014 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,59 +0,0 @@ -#!/usr/bin/env python3 - -from pgtoolkit import toolbase, pgbrowser -from pycolib.ansicolor import highlight, WHITE, YELLOW, RED, BOLD - - -class ListSerialTool(toolbase.SimpleTool): - max_int = 2147483647 - - def __init__(self): - toolbase.SimpleTool.__init__(self, name='listserial', desc='List sequences attached to column of type integer with dangerous last_value.') - self.init() - - def main(self): - conn = self.pgm.get_conn('target') - browser = pgbrowser.PgBrowser(conn) - rows = browser.list_sequences() - sequences = [] - for row in rows: - if row['related_column_type'] == 'integer': - # read sequence attributes like last_value - q = 'SELECT * FROM "%s"."%s"' % (row['sequence_schema'], row['sequence_name']) - curs = conn.cursor() - curs.execute(q) - attrs = curs.fetchone_dict() - # skip this sequence if its cycled and has safe max_value - if attrs['is_cycled'] and attrs['max_value'] <= self.max_int: - continue - # skip sequences with last_value not yet in half of max_int - if attrs['last_value'] < self.max_int / 2: - continue - # remember rest of sequences - row['attrs'] = attrs - sequences.append(row) - # sort most dangerous on top - sequences.sort(key=lambda x: x['attrs']['last_value'], reverse=True) - # print out what we've found - for seq in sequences: - print('Sequence:', seq['sequence_schema'] + '.' + seq['sequence_name']) - print(' Related:', seq['sequence_schema'] + '.' + seq['related_table'], seq['related_column'], '(' + seq['related_column_type'] + ')') - print(' integer max', '2147483647') - # colorize last value - last_val = seq['attrs']['last_value'] - col = WHITE + BOLD - if last_val > self.max_int * 0.9: - # near max - col = YELLOW + BOLD - if last_val > self.max_int: - # over max - col = RED + BOLD - print(' last_value', highlight(1, col) + str(last_val) + highlight(0)) - for key in ('min_value', 'max_value', 'is_cycled'): - print(' ', key, seq['attrs'][key]) - print() - - -tool = ListSerialTool() -tool.main() - diff -r d6088dba8fea -r 2a2d0d5df03b listtables.py --- a/listtables.py Tue May 06 18:37:41 2014 +0200 +++ b/listtables.py Tue May 06 18:37:43 2014 +0200 @@ -7,7 +7,6 @@ def __init__(self): toolbase.SimpleTool.__init__(self, name='listtables', desc='List tables in database.') self.parser.add_argument('-o', dest='options', type=str, nargs='*', help='Filter by options (eg. -o autovacuum_enabled=false).') - self.init() def main(self): browser = pgbrowser.PgBrowser(self.pgm.get_conn('target')) @@ -31,5 +30,6 @@ tool = ListTablesTool() +tool.setup() tool.main() diff -r d6088dba8fea -r 2a2d0d5df03b longqueries.py --- a/longqueries.py Tue May 06 18:37:41 2014 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,29 +0,0 @@ -#!/usr/bin/env python3 - -from pgtoolkit import pgstats, toolbase -from pycolib.ansicolor import highlight, YELLOW, BOLD - - -class LongQueriesTool(toolbase.SimpleTool): - def __init__(self): - toolbase.SimpleTool.__init__(self, name='longqueries', desc='List long queries.') - self.parser.add_argument('--age', default='1m', help='How long must be the query running to be listed.') - self.init() - - def main(self): - stats = pgstats.PgStats(self.pgm.get_conn('target')) - - for ln in stats.list_long_queries(self.args.age): - print(highlight(1), - 'backend PID: ', ln['procpid'], - ', query_start: ', ln['query_start'].strftime('%F %T'), - ', client IP: ', ln['client_addr'], - ln['waiting'] and ', ' + highlight(1, YELLOW|BOLD) + 'waiting' or '', - highlight(0), sep='') - print(ln['query']) - print() - - -tool = LongQueriesTool() -tool.main() - diff -r d6088dba8fea -r 2a2d0d5df03b loopquery.py --- a/loopquery.py Tue May 06 18:37:41 2014 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,98 +0,0 @@ -#!/usr/bin/env python3 -""" -loopquery - -Execute some queries in loop. Execute interval can be set. - -""" - -from pgtoolkit import toolbase - -import logging.handlers -import time -from datetime import datetime, timedelta - - -class LoopQueryTool(toolbase.ToolBase): - def __init__(self): - toolbase.ToolBase.__init__(self, name='loopquery', desc='Run query in loop.') - self.parser.add_argument('target', nargs='?', metavar='target', type=str, help='Target database') - self.parser.add_argument('-c', dest='config', type=str, help='Additional config file (besides pgtoolkit.conf).') - self.parser.add_argument('-q', dest='queries', metavar='QUERY', nargs='*', help='Queries to run.') - self.parser.add_argument('--mins', dest='delay_mins', type=int, help='Delay between queries in minutes.') - self.parser.add_argument('--secs', dest='delay_secs', type=int, help='Delay between queries in seconds.') - - self.config.add_option('target', type=str, default=None) - self.config.add_option('queries', type=list, default=[]) - self.config.add_option('delay_mins', type=int, default=0) - self.config.add_option('delay_secs', type=int, default=0) - self.config.add_option('log_path', type=str) - - self.target_isolation_level = 'autocommit' - - self.init() - - def init(self): - toolbase.ToolBase.init(self) - if self.args.config: - self.config.load(self.args.config) - self.queries = self.args.queries or self.config.queries - self.delay_mins = self.args.delay_mins or self.config.delay_mins - self.delay_secs = self.args.delay_secs or self.config.delay_secs - if self.config.log_path: - self.init_file_logs(self.config.log_path) - self.prepare_conns(target = self.args.target or self.config.target) - - def init_file_logs(self, path): - format = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s', '%y-%m-%d %H:%M:%S') - handler = logging.handlers.TimedRotatingFileHandler(path+'/main.log', when='midnight', backupCount=5) - handler.setFormatter(format) - handler.setLevel(logging.DEBUG) - logging.getLogger('main').addHandler(handler) - - format = logging.Formatter('%(asctime)s %(message)s', '%y-%m-%d %H:%M:%S') - handler = logging.handlers.TimedRotatingFileHandler(path+'/pgnotices.log', when='midnight', backupCount=5) - handler.setFormatter(format) - handler.setLevel(logging.DEBUG) - logging.getLogger('pgmanager_notices').addHandler(handler) - - def main(self): - self.reset() - while True: - self.wait() - self.action() - - def reset(self): - """Check current time, set next action time.""" - dt = datetime.today() - dt = dt.replace(microsecond = 0) - self.next_action_time = dt + timedelta(minutes = self.delay_mins, - seconds = self.delay_secs) - - def wait(self): - """Wait for action time, compute next action time.""" - now = datetime.today() - self.log.debug('Next run %s', self.next_action_time) - if self.next_action_time > now: - td = self.next_action_time - now - self.log.debug('Sleep %ds', td.seconds + td.microseconds/1e6) - time.sleep(td.seconds + td.microseconds/1e6) - self.next_action_time += timedelta(minutes = self.delay_mins, - seconds = self.delay_secs) - # in case that action took too long and next planned time would - # be in past -> reset planner - if self.next_action_time < now: - self.reset() - - def action(self): - """Execute the queries.""" - for q in self.queries: - self.log.info('%s', q) - with self.pgm.cursor('target') as curs: - curs.execute(q) - self.log.info('Done') - - -tool = LoopQueryTool() -tool.main() - diff -r d6088dba8fea -r 2a2d0d5df03b pgtool --- a/pgtool Tue May 06 18:37:41 2014 +0200 +++ b/pgtool Tue May 06 18:37:43 2014 +0200 @@ -24,21 +24,35 @@ from importlib import import_module -if len(sys.argv) < 2: - print(__doc__, end='') - sys.exit() +def print_tool_with_short_desc(name): + module = import_module('pgtoolkit.tools.' + tool) + short_desc = module.cls.__doc__.lstrip().splitlines()[0] + print(name.ljust(15), '-', short_desc) -if sys.argv[1] == '--list': - for tool in pgtoolkit.tools.__all__: - print(tool) - sys.exit() + +if __name__ == '__main__': + if len(sys.argv) < 2: + print(__doc__, end='') + sys.exit() -tool = sys.argv[1] -tool_args = sys.argv[2:] + if sys.argv[1].startswith('--'): + if sys.argv[1] == '--list': + for tool in pgtoolkit.tools.__all__: + print_tool_with_short_desc(tool) + else: + print(__doc__, end='') + sys.exit() -module = import_module('pgtoolkit.tools.' + tool) + tool = sys.argv[1] + tool_args = sys.argv[2:] -tool = module.cls() -tool.init(tool_args) -tool.main() + if tool not in pgtoolkit.tools.__all__: + print('Unknown tool "%s".\n\nCall "pgtool --list" to get list of all available tools.' % tool) + sys.exit() + module = import_module('pgtoolkit.tools.' + tool) + + tool = module.cls() + tool.setup(tool_args) + tool.main() + diff -r d6088dba8fea -r 2a2d0d5df03b pgtoolkit/pgdatadiff.py --- a/pgtoolkit/pgdatadiff.py Tue May 06 18:37:41 2014 +0200 +++ b/pgtoolkit/pgdatadiff.py Tue May 06 18:37:43 2014 +0200 @@ -28,6 +28,8 @@ from pgtoolkit import pgbrowser from pycolib.ansicolor import * +import sys + class DiffData: COLORS = { @@ -38,14 +40,14 @@ 'K' : BOLD | BLUE} def __init__(self, change, cols1, cols2, key=None): - ''' + """ change - one of '+', '-', '*' (add, remove, update) cols1 - original column values (OrderedDict) cols2 - new column values (OrderedDict) key - primary key columns (OrderedDict) - ''' + """ self.change = change self.cols1 = cols1 self.cols2 = cols2 @@ -155,11 +157,11 @@ self.fulltable2 = '"' + schema + '"."'+ table + '"' def iter_diff(self): - '''Return differencies between data of two tables. + """Return differencies between data of two tables. Yields one line at the time. - ''' + """ curs1, curs2 = self._select() row1 = curs1.fetchone_dict() @@ -186,25 +188,25 @@ curs1.close() curs2.close() - def print_diff(self): - '''Print differencies between data of two tables. + def print_diff(self, file=sys.stdout): + """Print differencies between data of two tables. The output is in human readable form. Set allowcolor=True of PgDataDiff instance to get colored output. - ''' + """ for ln in self.iter_diff(): - print(ln.format()) + print(ln.format(), file=file) - def print_patch(self): - '''Print SQL script usable as patch for destination table. + def print_patch(self, file=sys.stdout): + """Print SQL script usable as patch for destination table. Supports INSERT, DELETE and UPDATE operations. - ''' + """ for ln in self.iter_diff(): - print(ln.format_patch(self.fulltable1)) + print(ln.format_patch(self.fulltable1), file=file) def _select(self): browser = pgbrowser.PgBrowser(self.conn1) diff -r d6088dba8fea -r 2a2d0d5df03b pgtoolkit/pgmanager.py --- a/pgtoolkit/pgmanager.py Tue May 06 18:37:41 2014 +0200 +++ b/pgtoolkit/pgmanager.py Tue May 06 18:37:43 2014 +0200 @@ -334,6 +334,9 @@ del self.conn_known[name] del self.conn_pool[name] + def knows_conn(self, name='default'): + return name in self.conn_known + def get_conn(self, name='default'): '''Get connection of name 'name' from pool.''' self._check_fork() diff -r d6088dba8fea -r 2a2d0d5df03b pgtoolkit/toolbase.py --- a/pgtoolkit/toolbase.py Tue May 06 18:37:41 2014 +0200 +++ b/pgtoolkit/toolbase.py Tue May 06 18:37:43 2014 +0200 @@ -26,24 +26,29 @@ class ToolBase: - def __init__(self, name, desc, **kwargs): - self.parser = argparse.ArgumentParser(prog=name, description=desc, + + def __init__(self, name, desc=None, **kwargs): + self.config = ConfigParser() + self.parser = argparse.ArgumentParser(prog=name, description=desc or self.__doc__, formatter_class=ToolDescriptionFormatter) - self.parser.add_argument('-d', dest='debug', action='store_true', - help='Debug mode - print database queries.') + self.pgm = pgmanager.get_instance() + self.target_isolation_level = None - self.config = ConfigParser() + def setup(self, args=None): + self.specify_args() + self.load_args(args) + self.init_logging() + + def specify_args(self): self.config.add_option('databases', dict) self.config.add_option('meta_db') self.config.add_option('meta_query') - - self.pgm = pgmanager.get_instance() - self.target_isolation_level = None + self.parser.add_argument('-Q', dest='queries', action='store_true', + help='Print database queries.') - def init(self, args=None): - self.config.load('pgtoolkit.conf') + def load_args(self, args=None, config_file=None): + self.config.load(config_file or 'pgtoolkit.conf') self.args = self.parser.parse_args(args) - self.init_logging() def init_logging(self): # logging @@ -59,18 +64,20 @@ log_notices.addHandler(handler) log_notices.setLevel(logging.DEBUG) - if self.args.debug: + if self.args.queries: log_sql = logging.getLogger('pgmanager_sql') log_sql.addHandler(handler) log_sql.setLevel(logging.DEBUG) def prepare_conn_from_metadb(self, name, lookup_name): - '''Create connection in pgmanager using meta DB. + """Create connection in pgmanager using meta DB. name -- Name for connection in pgmanager. lookup_name -- Name of connection in meta DB. - ''' + """ + if not self.pgm.knows_conn('meta'): + self.pgm.create_conn(name='meta', dsn=self.config.meta_db) with self.pgm.cursor('meta') as curs: curs.execute(self.config.meta_query, [lookup_name]) row = curs.fetchone_dict() @@ -80,9 +87,10 @@ isolation_level=self.target_isolation_level, **row) return True + self.pgm.close_conn('meta') def prepare_conn_from_config(self, name, lookup_name): - '''Create connection in pgmanager using info in config.databases.''' + """Create connection in pgmanager using info in config.databases.""" if self.config.databases: if lookup_name in self.config.databases: dsn = self.config.databases[lookup_name] @@ -99,9 +107,6 @@ value: connection name in config or meta DB """ - if self.config.meta_db: - self.pgm.create_conn(name='meta', dsn=self.config.meta_db) - for name in kwargs: lookup_name = kwargs[name] found = self.prepare_conn_from_config(name, lookup_name) @@ -110,41 +115,52 @@ if not found: raise ConnectionInfoNotFound('Connection name "%s" not found in config nor in meta DB.' % lookup_name) - if self.config.meta_db: - self.pgm.close_conn('meta') - class SimpleTool(ToolBase): - def __init__(self, name, desc, **kwargs): + + def __init__(self, name, desc=None, **kwargs): ToolBase.__init__(self, name, desc, **kwargs) + + def specify_args(self): + ToolBase.specify_args(self) self.parser.add_argument('target', metavar='target', type=str, help='Target database') - def init(self, args=None): - ToolBase.init(self, args) + def setup(self, args=None): + ToolBase.setup(self, args) self.prepare_conns(target=self.args.target) class SrcDstTool(ToolBase): - def __init__(self, name, desc, **kwargs): + + def __init__(self, name, desc=None, *, allow_reverse=False, force_reverse=False, **kwargs): ToolBase.__init__(self, name, desc, **kwargs) + self.allow_reverse = allow_reverse + self.force_reverse = force_reverse + + def specify_args(self): + ToolBase.specify_args(self) self.parser.add_argument('src', metavar='source', type=str, help='Source database') self.parser.add_argument('dst', metavar='destination', type=str, help='Destination database') - if 'allow_reverse' in kwargs and kwargs['allow_reverse']: + if self.allow_reverse: self.parser.add_argument('-r', '--reverse', action='store_true', help='Reverse operation. Swap source and destination.') - def init(self, args=None): - ToolBase.init(self, args) + def load_args(self, args=None, config_file=None): + ToolBase.load_args(self, args, config_file) if self.is_reversed(): self.args.src, self.args.dst = self.args.dst, self.args.src + + def setup(self, args=None): + ToolBase.setup(self, args) self.prepare_conns(src=self.args.src, dst=self.args.dst) def is_reversed(self): - return 'reverse' in self.args and self.args.reverse + return ('reverse' in self.args and self.args.reverse) or self.force_reverse class SrcDstTablesTool(SrcDstTool): - def __init__(self, name, desc, **kwargs): - SrcDstTool.__init__(self, name, desc, **kwargs) + + def specify_args(self): + SrcDstTool.specify_args(self) self.parser.add_argument('-t', '--src-table', metavar='source_table', dest='srctable', type=str, default='', help='Source table name.') self.parser.add_argument('-s', '--src-schema', metavar='source_schema', @@ -155,9 +171,11 @@ dest='dstschema', type=str, default='', help='Destination schema name (default=source_schema).') self.parser.add_argument('--regex', action='store_true', help="Use RE in schema or table name.") - def init(self, args=None): - SrcDstTool.init(self, args) + def load_args(self, args=None, config_file=None): + SrcDstTool.load_args(self, args, config_file) + self.load_table_names() + def load_table_names(self): self.schema1 = self.args.srcschema self.table1 = self.args.srctable self.schema2 = self.args.dstschema diff -r d6088dba8fea -r 2a2d0d5df03b pgtoolkit/tools/__init__.py --- a/pgtoolkit/tools/__init__.py Tue May 06 18:37:41 2014 +0200 +++ b/pgtoolkit/tools/__init__.py Tue May 06 18:37:43 2014 +0200 @@ -1,1 +1,3 @@ -__all__ = ['tablediff'] +__all__ = ['analyzeall', 'batchcopy', 'bigtables', 'listdepends', + 'listserial', 'longqueries', 'loopquery', + 'runquery', 'schemadiff', 'tablediff', 'tablesync'] diff -r d6088dba8fea -r 2a2d0d5df03b pgtoolkit/tools/analyzeall.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pgtoolkit/tools/analyzeall.py Tue May 06 18:37:43 2014 +0200 @@ -0,0 +1,52 @@ +from pgtoolkit.toolbase import SimpleTool +from pgtoolkit import pgbrowser + + +class AnalyzeAllTool(SimpleTool): + + """ + Analyze/vacuum all tables in selected schemas. + + Partially emulates VACUUM ANALYZE VERBOSE query. + But this program is more configurable, skips pg_catalog etc. + + """ + + def __init__(self): + SimpleTool.__init__(self, name='analyzeall') + self.target_isolation_level = 'autocommit' + + def specify_args(self): + SimpleTool.specify_args(self) + self.parser.add_argument('-s', dest='schema', nargs='*', help='Schema filter') + self.parser.add_argument('--vacuum', action='store_true', help='Call VACUUM ANALYZE') + self.parser.add_argument('--vacuum-full', action='store_true', help='Call VACUUM FULL ANALYZE') + self.parser.add_argument('--reindex', action='store_true', help='Call REINDEX TABLE') + + def main(self): + browser = pgbrowser.PgBrowser(self.pgm.get_conn('target')) + + query_patterns = ['ANALYZE %s.%s;'] + if self.args.vacuum: + query_patterns = ['VACUUM ANALYZE %s.%s;'] + if self.args.vacuum_full: + query_patterns = ['VACUUM FULL ANALYZE %s.%s;'] + if self.args.reindex: + query_patterns += ['REINDEX TABLE %s.%s;'] + + schema_list = self.args.schema + if not schema_list: + schema_list = [schema['name'] for schema in browser.list_schemas() if not schema['system']] + + for schema in schema_list: + tables = browser.list_tables(schema=schema) + with self.pgm.cursor('target') as curs: + for table in tables: + for query_pattern in query_patterns: + query = query_pattern % (schema, table['name']) + self.log.info(query) + curs.execute(query, []) + + +cls = AnalyzeAllTool + diff -r d6088dba8fea -r 2a2d0d5df03b pgtoolkit/tools/batchcopy.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pgtoolkit/tools/batchcopy.py Tue May 06 18:37:43 2014 +0200 @@ -0,0 +1,61 @@ +from pgtoolkit.toolbase import SrcDstTablesTool +from pgtoolkit.pgmanager import IntegrityError + + +class BatchCopyTool(SrcDstTablesTool): + + """ + Copy data from one table to another, filtering by specified condition. + + """ + + def __init__(self): + SrcDstTablesTool.__init__(self, name='batchcopy', desc='') + + def specify_args(self): + SrcDstTablesTool.specify_args(self) + self.parser.add_argument('--table-name', type=str, help='Table to be copied.') + self.parser.add_argument('--src-filter', type=str, help='WHERE condition for source query.') + self.parser.add_argument('--file-with-ids', type=str, help='Read source IDs from file (each ID on new line). Use these in --src-filter as {ids}') + self.parser.add_argument('--dst-exists', choices=['rollback', 'ignore', 'update'], default='rollback', help='What to do when destination record already exists.') + + def main(self): + # read list of IDs from file + ids = '' + if self.args.file_with_ids: + with open(self.args.file_with_ids, 'r') as f: + ids = ','.join(ln.rstrip() for ln in f.readlines()) + + # read source data + with self.pgm.cursor('src') as src_curs: + condition = self.args.src_filter.format(ids=ids) or 'true' + src_curs.execute('SELECT * FROM {} WHERE {}'.format(self.args.table_name, condition)) + #TODO: ORDER BY id OFFSET 0 LIMIT 100 + data = src_curs.fetchall_dict() + src_curs.connection.commit() + + with self.pgm.cursor('dst') as dst_curs: + copied = 0 + for row in data: + keys = ', '.join(row.keys()) + values_mask = ', '.join(['%s'] * len(row)) + query = 'INSERT INTO {} ({}) VALUES ({})'.format(self.args.table_name, keys, values_mask) + try: + dst_curs.execute('SAVEPOINT the_query;') + dst_curs.execute(query, list(row.values())) + dst_curs.execute('RELEASE SAVEPOINT the_query;') + copied += 1 + except IntegrityError: + if self.args.dst_exists == 'rollback': + dst_curs.connection.rollback() + break + elif self.args.dst_exists == 'ignore': + dst_curs.execute('ROLLBACK TO SAVEPOINT the_query;') + elif self.args.dst_exists == 'update': + raise NotImplementedError() + dst_curs.connection.commit() + + self.log.info('Copied %s rows.', copied) + + +cls = BatchCopyTool diff -r d6088dba8fea -r 2a2d0d5df03b pgtoolkit/tools/bigtables.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pgtoolkit/tools/bigtables.py Tue May 06 18:37:43 2014 +0200 @@ -0,0 +1,55 @@ +from pgtoolkit.toolbase import SimpleTool +from pgtoolkit import pgbrowser +from pycolib.prettysize import prettysize_short +from pycolib.ansicolor import highlight + + +class BigTablesTool(SimpleTool): + + """List largest tables. + + Reads size statistics of tables and indexes from pgcatalog. + + """ + + def __init__(self): + SimpleTool.__init__(self, name='bigtables') + + def specify_args(self): + SimpleTool.specify_args(self) + self.parser.add_argument('-n', '--limit', metavar='NUM', dest='limit', type=int, default=5, help='Show NUM biggest tables.') + self.parser.add_argument('-v', '--details', dest='details', action='store_true', help='Show sizes of data and individual indexes.') + + def main(self): + browser = pgbrowser.PgBrowser(self.pgm.get_conn('target')) + + # scan all tables from all shemas, remember names and sizes + all_tables = [] + all_indexes = [] + schemas = browser.list_schemas() + for schema in schemas: + tables = browser.list_tables(schema['name']) + for table in tables: + table_name = '%s.%s' % (schema['name'], table['name']) + indexes = browser.list_indexes(table['name'], schema['name']) + for index in indexes: + all_indexes.append({'name': index['name'], 'table': table_name, 'size': index['size']}) + all_tables.append({'name': table_name, 'size': table['size'], 'indexes': indexes}) + + # print names and sizes of 20 largest tables + for table in sorted(all_tables, reverse=True, key=lambda x: x['size'])[:self.args.limit]: + size_of_indexes = sum(index['size'] for index in table['indexes']) + print(highlight(1) + prettysize_short(table['size'] + size_of_indexes, trailing_zeros=True).rjust(8) + highlight(0), + '(total)'.ljust(8), + highlight(1) + table['name'] + highlight(0), sep=' ') + if self.args.details: + print(prettysize_short(table['size'], trailing_zeros=True).rjust(8), + '(data)'.ljust(8), sep=' ') + for index in sorted(table['indexes'], reverse=True, key=lambda x: x['size']): + print(prettysize_short(index['size'], trailing_zeros=True).rjust(8), + '(index)'.ljust(8), index['name'], sep=' ') + print() + + +cls = BigTablesTool + diff -r d6088dba8fea -r 2a2d0d5df03b pgtoolkit/tools/listdepends.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pgtoolkit/tools/listdepends.py Tue May 06 18:37:43 2014 +0200 @@ -0,0 +1,31 @@ +from pgtoolkit.toolbase import SimpleTool +from pgtoolkit import pgbrowser + + +class ListDependsTool(SimpleTool): + + """ + List column dependencies. + + """ + + def __init__(self): + SimpleTool.__init__(self, name='listdepends') + + def specify_args(self): + SimpleTool.specify_args(self) + self.parser.add_argument('table', metavar='table', type=str, help='Table name.') + self.parser.add_argument('column', metavar='column', type=str, help='Column name.') + self.parser.add_argument('-s', '--schema', dest='schema', metavar='schema', + type=str, default='public', help='Schema name (default=public).') + + def main(self): + browser = pgbrowser.PgBrowser(self.pgm.get_conn('target')) + + objects = browser.list_column_usage(self.args.table, self.args.column, schema=self.args.schema) + for obj in sorted(objects, key=lambda x: (x['type'], x['schema'], x['name'])): + print(obj['type'], ' ', obj['schema'], '.', obj['name'], sep='') + + +cls = ListDependsTool + diff -r d6088dba8fea -r 2a2d0d5df03b pgtoolkit/tools/listserial.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pgtoolkit/tools/listserial.py Tue May 06 18:37:43 2014 +0200 @@ -0,0 +1,69 @@ +from pgtoolkit.toolbase import SimpleTool +from pgtoolkit import pgbrowser +from pycolib.ansicolor import highlight, WHITE, YELLOW, RED, BOLD + + +class ListSerialTool(SimpleTool): + + """List sequences near to overflow. + + Checks all sequences attached to a column of type integer. + + Highlight dangerous values of sequence: + * Yellow - near overflow (90%) + * Red - already over... + + Does not list sequences with value under 50% of range. + + """ + + max_int = 2147483647 + + def __init__(self): + SimpleTool.__init__(self, name='listserial') + + def main(self): + conn = self.pgm.get_conn('target') + browser = pgbrowser.PgBrowser(conn) + rows = browser.list_sequences() + sequences = [] + for row in rows: + if row['related_column_type'] == 'integer': + # read sequence attributes like last_value + q = 'SELECT * FROM "%s"."%s"' % (row['sequence_schema'], row['sequence_name']) + curs = conn.cursor() + curs.execute(q) + attrs = curs.fetchone_dict() + # skip this sequence if its cycled and has safe max_value + if attrs['is_cycled'] and attrs['max_value'] <= self.max_int: + continue + # skip sequences with last_value not yet in half of max_int + if attrs['last_value'] < self.max_int / 2: + continue + # remember rest of sequences + row['attrs'] = attrs + sequences.append(row) + # sort most dangerous on top + sequences.sort(key=lambda x: x['attrs']['last_value'], reverse=True) + # print out what we've found + for seq in sequences: + print('Sequence:', seq['sequence_schema'] + '.' + seq['sequence_name']) + print(' Related:', seq['sequence_schema'] + '.' + seq['related_table'], seq['related_column'], '(' + seq['related_column_type'] + ')') + print(' integer max', '2147483647') + # colorize last value + last_val = seq['attrs']['last_value'] + col = WHITE + BOLD + if last_val > self.max_int * 0.9: + # near max + col = YELLOW + BOLD + if last_val > self.max_int: + # over max + col = RED + BOLD + print(' last_value', highlight(1, col) + str(last_val) + highlight(0)) + for key in ('min_value', 'max_value', 'is_cycled'): + print(' ', key, seq['attrs'][key]) + print() + + +cls = ListSerialTool + diff -r d6088dba8fea -r 2a2d0d5df03b pgtoolkit/tools/longqueries.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pgtoolkit/tools/longqueries.py Tue May 06 18:37:43 2014 +0200 @@ -0,0 +1,34 @@ +from pgtoolkit.toolbase import SimpleTool +from pgtoolkit import pgstats +from pycolib.ansicolor import highlight, YELLOW, BOLD + + +class LongQueriesTool(SimpleTool): + + """ + List long running queries. + """ + + def __init__(self): + SimpleTool.__init__(self, name='longqueries') + + def specify_args(self): + SimpleTool.specify_args(self) + self.parser.add_argument('--age', default='1m', help='How long must be the query running to be listed.') + + def main(self): + stats = pgstats.PgStats(self.pgm.get_conn('target')) + + for ln in stats.list_long_queries(self.args.age): + print(highlight(1), + 'backend PID: ', ln['procpid'], + ', query_start: ', ln['query_start'].strftime('%F %T'), + ', client IP: ', ln['client_addr'], + ln['waiting'] and ', ' + highlight(1, YELLOW|BOLD) + 'waiting' or '', + highlight(0), sep='') + print(ln['query']) + print() + + +cls = LongQueriesTool + diff -r d6088dba8fea -r 2a2d0d5df03b pgtoolkit/tools/loopquery.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pgtoolkit/tools/loopquery.py Tue May 06 18:37:43 2014 +0200 @@ -0,0 +1,101 @@ +from pgtoolkit.toolbase import ToolBase + +import logging.handlers +import time +from datetime import datetime, timedelta + + +class LoopQueryTool(ToolBase): + + """ + Execute queries in loop, with configurable interval. + + """ + + def __init__(self): + ToolBase.__init__(self, name='loopquery') + self.target_isolation_level = 'autocommit' + + def specify_args(self): + ToolBase.specify_args(self) + self.parser.add_argument('target', nargs='?', metavar='target', type=str, help='Target database') + self.parser.add_argument('-c', dest='config', type=str, help='Additional config file (besides pgtoolkit.conf).') + self.parser.add_argument('-q', dest='queries', metavar='QUERY', nargs='*', help='Queries to run.') + self.parser.add_argument('--mins', dest='delay_mins', type=int, help='Delay between queries in minutes.') + self.parser.add_argument('--secs', dest='delay_secs', type=int, help='Delay between queries in seconds.') + + self.config.add_option('target', type=str, default=None) + self.config.add_option('queries', type=list, default=[]) + self.config.add_option('delay_mins', type=int, default=0) + self.config.add_option('delay_secs', type=int, default=0) + self.config.add_option('log_path', type=str) + + def load_args(self, args=None, config_file=None): + ToolBase.load_args(self, args, config_file) + if self.args.config: + self.config.load(self.args.config) + self.queries = self.args.queries or self.config.queries + self.delay_mins = self.args.delay_mins or self.config.delay_mins + self.delay_secs = self.args.delay_secs or self.config.delay_secs + + def init_logging(self): + ToolBase.init_logging(self) + if self.config.log_path: + self.init_file_logs(self.config.log_path) + + def setup(self, args=None): + ToolBase.setup(self, args) + self.prepare_conns(target=self.args.target or self.config.target) + + def init_file_logs(self, path): + format = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s', '%y-%m-%d %H:%M:%S') + handler = logging.handlers.TimedRotatingFileHandler(path+'/main.log', when='midnight', backupCount=5) + handler.setFormatter(format) + handler.setLevel(logging.DEBUG) + logging.getLogger('main').addHandler(handler) + + format = logging.Formatter('%(asctime)s %(message)s', '%y-%m-%d %H:%M:%S') + handler = logging.handlers.TimedRotatingFileHandler(path+'/pgnotices.log', when='midnight', backupCount=5) + handler.setFormatter(format) + handler.setLevel(logging.DEBUG) + logging.getLogger('pgmanager_notices').addHandler(handler) + + def main(self): + self.reset() + while True: + self.wait() + self.action() + + def reset(self): + """Check current time, set next action time.""" + dt = datetime.today() + dt = dt.replace(microsecond = 0) + self.next_action_time = dt + timedelta(minutes = self.delay_mins, + seconds = self.delay_secs) + + def wait(self): + """Wait for action time, compute next action time.""" + now = datetime.today() + self.log.debug('Next run %s', self.next_action_time) + if self.next_action_time > now: + td = self.next_action_time - now + self.log.debug('Sleep %ds', td.seconds + td.microseconds/1e6) + time.sleep(td.seconds + td.microseconds/1e6) + self.next_action_time += timedelta(minutes = self.delay_mins, + seconds = self.delay_secs) + # in case that action took too long and next planned time would + # be in past -> reset planner + if self.next_action_time < now: + self.reset() + + def action(self): + """Execute the queries.""" + for q in self.queries: + self.log.info('%s', q) + with self.pgm.cursor('target') as curs: + curs.execute(q) + self.log.info('Done') + + +cls = LoopQueryTool + diff -r d6088dba8fea -r 2a2d0d5df03b pgtoolkit/tools/runquery.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pgtoolkit/tools/runquery.py Tue May 06 18:37:43 2014 +0200 @@ -0,0 +1,106 @@ +from pgtoolkit.toolbase import ToolBase + +import logging.handlers +import time +from datetime import datetime, timedelta +from psycopg2 import ProgrammingError + + +class RunQueryTool(ToolBase): + + """ + Execute configured queries in target database. + """ + + def __init__(self): + ToolBase.__init__(self, name='runquery') + self.target_isolation_level = 'autocommit' + + def specify_args(self): + ToolBase.specify_args(self) + self.parser.add_argument('target', nargs='?', metavar='target', type=str, help='Target database') + self.parser.add_argument('-c', dest='config', type=str, help='Additional config file (besides pgtoolkit.conf).') + self.parser.add_argument('-q', dest='queries', metavar='QUERY', nargs='*', help='Queries to run.') + self.parser.add_argument('-f', dest='file', metavar='FILE', help='Read query from file.') + self.parser.add_argument('--one-query-per-line', action='store_true', help='When reading queries from file, consider each line as separate query.') + self.parser.add_argument('-p', '--parameter', dest='parameters', metavar='PARAM=VALUE', nargs='*', + help="If query should be used as format template, these parameters will be substituted.") + self.parser.add_argument('--output-file', dest='output_file', metavar='OUTPUT_FILE', help='Write query result in specified file.') + self.parser.add_argument('--format', dest='format', metavar='FORMAT', help='Format string for each line in output file (using Python\'s format()).') + + self.config.add_option('target', type=str, default=None) + self.config.add_option('queries', type=list, default=[]) + self.config.add_option('log_path', type=str) + + def setup(self, args=None): + ToolBase.setup(self, args) + self.prepare_conns(target=self.args.target or self.config.target) + + def load_args(self, args=None, config_file=None): + ToolBase.load_args(self, args, config_file) + if self.args.config: + self.config.load(self.args.config) + self.queries = self.args.queries or self.config.queries + # read query from file + if self.args.file: + with open(self.args.file, 'r', encoding='utf8') as f: + data = f.read() + if self.args.one_query_per_line: + file_queries = [ln for ln in data.splitlines() if not ln.lstrip().startswith('--')] + self.queries = file_queries + self.queries + else: + self.queries.insert(0, data) + # prepare parameters + self._prepare_parameters(self.args.parameters) + + def init_logging(self): + ToolBase.init_logging(self) + if self.config.log_path: + self.init_file_logs(self.config.log_path) + + def init_file_logs(self, path): + format = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s', '%y-%m-%d %H:%M:%S') + handler = logging.handlers.TimedRotatingFileHandler(path+'/main.log', when='midnight', backupCount=5) + handler.setFormatter(format) + handler.setLevel(logging.DEBUG) + logging.getLogger('main').addHandler(handler) + + format = logging.Formatter('%(asctime)s %(message)s', '%y-%m-%d %H:%M:%S') + handler = logging.handlers.TimedRotatingFileHandler(path+'/pgnotices.log', when='midnight', backupCount=5) + handler.setFormatter(format) + handler.setLevel(logging.DEBUG) + logging.getLogger('pgmanager_notices').addHandler(handler) + + def main(self): + """Execute the queries.""" + print(self.queries) + for q in self.queries: + if self.parameters: + q = q.format(**self.parameters) + self.log.info('%s', q if len(q) < 100 else q[:100]+'...') + with self.pgm.cursor('target') as curs: + curs.execute(q) + self.log.info('Rows affected: %d', curs.rowcount) + try: + rows = curs.fetchall_dict() + self._write_output_file(rows) + except ProgrammingError: + pass + self.log.info('Done') + + def _write_output_file(self, rows): + if not self.args.output_file: + return + with open(self.args.output_file, 'w', encoding='utf8') as f: + for row in rows: + print(self.args.format.format(row), file=f) + + def _prepare_parameters(self, parameters): + self.parameters = {} + for parameter in parameters or (): + name, value = parameter.split('=', 1) + self.parameters[name] = value + + +cls = RunQueryTool + diff -r d6088dba8fea -r 2a2d0d5df03b pgtoolkit/tools/schemadiff.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pgtoolkit/tools/schemadiff.py Tue May 06 18:37:43 2014 +0200 @@ -0,0 +1,51 @@ +from pgtoolkit.toolbase import SrcDstTool +from pgtoolkit import pgmanager, pgbrowser, pgdiff, toolbase + + +class SchemaDiffTool(SrcDstTool): + + """ + Print differences in database schema. + + Prints changes from source to destination. + SQL patch updates source database schema to destination schema. + + """ + + def __init__(self): + SrcDstTool.__init__(self, name='schemadiff', allow_reverse=True) + + def specify_args(self): + SrcDstTool.specify_args(self) + self.parser.add_argument('-s', dest='schema', nargs='*', help='Schema filter') + self.parser.add_argument('-t', dest='table', nargs='*', help='Table filter') + self.parser.add_argument('-f', dest='function', type=str, help='Function filter (regex)') + self.parser.add_argument('--sql', action='store_true', help='Output is SQL script.') + self.parser.add_argument('--body', action='store_true', help='Output diff for function bodies.') + + def main(self): + srcbrowser = pgbrowser.PgBrowser(self.pgm.get_conn('src')) + dstbrowser = pgbrowser.PgBrowser(self.pgm.get_conn('dst')) + + pgd = pgdiff.PgDiff(srcbrowser, dstbrowser) + + try: + if self.args.schema: + pgd.filter_schemas(include=self.args.schema) + if self.args.table: + pgd.filter_tables(include=self.args.table) + if self.args.function: + pgd.filter_functions(self.args.function) + if self.args.body: + pgd.function_body_diff = True + + if self.args.sql: + pgd.print_patch() + else: + pgd.print_diff() + except pgdiff.PgDiffError as e: + print('PgDiff error:', str(e)) + + +cls = SchemaDiffTool + diff -r d6088dba8fea -r 2a2d0d5df03b pgtoolkit/tools/tablediff.py --- a/pgtoolkit/tools/tablediff.py Tue May 06 18:37:41 2014 +0200 +++ b/pgtoolkit/tools/tablediff.py Tue May 06 18:37:43 2014 +0200 @@ -2,11 +2,13 @@ from pgtoolkit.toolbase import SrcDstTablesTool from pycolib.ansicolor import highlight, BOLD, YELLOW +import sys -class TableDiffTool(toolbase.SrcDstTablesTool): + +class TableDiffTool(SrcDstTablesTool): """ - Print differencies between data in tables. + Print differences between data in tables. Requirements: * Source table must have defined PRIMARY KEY. @@ -17,19 +19,29 @@ def __init__(self): SrcDstTablesTool.__init__(self, name='tablediff', desc=self.__doc__, allow_reverse=True) + + def specify_args(self): + SrcDstTablesTool.specify_args(self) self.parser.add_argument('--sql', action='store_true', help='Output is SQL script.') self.parser.add_argument('--rowcount', action='store_true', help='Compare number of rows.') + self.parser.add_argument('-o', '--output-file', help='Output file for sql queries.') def main(self): srcconn = self.pgm.get_conn('src') dstconn = self.pgm.get_conn('dst') + if self.args.output_file: + output_file = open(self.args.output_file, 'w') + else: + output_file = sys.stdout + dd = pgdatadiff.PgDataDiff(srcconn, dstconn) for srcschema, srctable, dstschema, dsttable in self.tables(): print('-- Diff from [%s] %s.%s to [%s] %s.%s' % ( self.args.src, srcschema, srctable, - self.args.dst, dstschema, dsttable)) + self.args.dst, dstschema, dsttable), + file=output_file) if self.args.rowcount: with self.pgm.cursor('src') as curs: @@ -39,16 +51,18 @@ curs.execute('''SELECT count(*) FROM "%s"."%s"''' % (dstschema, dsttable)) dstcount = curs.fetchone()[0] if srccount != dstcount: - print(highlight(1, BOLD | YELLOW), "Row count differs: src=%s dst=%s" % (srccount, dstcount), highlight(0), sep='') + print(highlight(1, BOLD | YELLOW), + "Row count differs: src=%s dst=%s" % (srccount, dstcount), + highlight(0), sep='', file=output_file) continue dd.settable1(srctable, srcschema) dd.settable2(dsttable, dstschema) if self.args.sql: - dd.print_patch() + dd.print_patch(file=output_file) else: - dd.print_diff() + dd.print_diff(file=output_file) cls = TableDiffTool diff -r d6088dba8fea -r 2a2d0d5df03b pgtoolkit/tools/tablesync.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pgtoolkit/tools/tablesync.py Tue May 06 18:37:43 2014 +0200 @@ -0,0 +1,60 @@ +from pgtoolkit.toolbase import SrcDstTool +from pgtoolkit.tools.tablediff import TableDiffTool +from pgtoolkit.tools.runquery import RunQueryTool + + +class TableSyncTool(SrcDstTool): + + """ + Synchronize tables between two databases (tablediff + runquery). + + This will essentially call following commands on each table from list: + * pgtool tablediff -r -s -t --sql -o /tmp/diff.sql + * pgtool runquery -f /tmp/diff.sql + + """ + + def __init__(self): + SrcDstTool.__init__(self, name='tablesync', force_reverse=True) + self.tablediff = TableDiffTool() + self.tablediff.specify_args() + self.runquery = RunQueryTool() + self.runquery.specify_args() + + def specify_args(self): + SrcDstTool.specify_args(self) + self.parser.add_argument('-t', dest='tables', metavar='table', nargs='*', + help="Tables to be synchronized.") + self.parser.add_argument('-s', '--schema', metavar='default_schema', + dest='schema', type=str, default='public', help='Default schema name.') + + def init_logging(self): + SrcDstTool.init_logging(self) + self.runquery.log = self.log + + def setup(self, args=None): + SrcDstTool.setup(self, args) + self.target_isolation_level = 'autocommit' + self.prepare_conns(target=self.args.src) + + def main(self): + for table in self.args.tables: + self.sync(table) + + def sync(self, table): + if '.' in table: + schema, table = table.split('.', 1) + else: + schema = self.args.schema + # Call tablediff + self.tablediff.load_args([self.args.src, self.args.dst, + '-r', '-s', schema, '-t', table, '--sql', '-o', '/tmp/diff.sql']) + self.tablediff.main() + # Call runquery + self.runquery.load_args([self.args.src, '--one-query-per-line', + '-f', '/tmp/diff.sql']) + self.runquery.main() + + +cls = TableSyncTool + diff -r d6088dba8fea -r 2a2d0d5df03b runquery.py --- a/runquery.py Tue May 06 18:37:41 2014 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,105 +0,0 @@ -#!/usr/bin/env python3 -""" -runquery - -Execute configured queries once. - -""" - -from pgtoolkit import toolbase - -import logging.handlers -import time -from datetime import datetime, timedelta - -from psycopg2 import ProgrammingError - - -class RunQueryTool(toolbase.ToolBase): - - def __init__(self): - toolbase.ToolBase.__init__(self, name='runquery', desc='Run configured queries.') - self.parser.add_argument('target', nargs='?', metavar='target', type=str, help='Target database') - self.parser.add_argument('-c', dest='config', type=str, help='Additional config file (besides pgtoolkit.conf).') - self.parser.add_argument('-q', dest='queries', metavar='QUERY', nargs='*', help='Queries to run.') - self.parser.add_argument('-f', dest='file', metavar='FILE', help='Read query from file.') - self.parser.add_argument('--one-query-per-line', action='store_true', help='When reading queries from file, consider each line as separate query.') - self.parser.add_argument('-p', '--parameter', dest='parameters', metavar='PARAM=VALUE', nargs='*', - help="If query should be used as format template, these parameters will be substituted.") - self.parser.add_argument('--output-file', dest='output_file', metavar='OUTPUT_FILE', help='Write query result in specified file.') - self.parser.add_argument('--format', dest='format', metavar='FORMAT', help='Format string for each line in output file (using Python\'s format()).') - - self.config.add_option('target', type=str, default=None) - self.config.add_option('queries', type=list, default=[]) - self.config.add_option('log_path', type=str) - - self.target_isolation_level = 'autocommit' - - self.init() - - def init(self): - toolbase.ToolBase.init(self) - if self.args.config: - self.config.load(self.args.config) - self.queries = self.args.queries or self.config.queries - # read query from file - if self.args.file: - with open(self.args.file, 'r', encoding='utf8') as f: - data = f.read() - if self.args.one_query_per_line: - file_queries = [ln for ln in data.splitlines() if not ln.lstrip().startswith('--')] - self.queries = file_queries + self.queries - else: - self.queries.insert(0, data) - # prepare parameters - self._prepare_parameters(self.args.parameters) - if self.config.log_path: - self.init_file_logs(self.config.log_path) - self.prepare_conns(target = self.args.target or self.config.target) - - def init_file_logs(self, path): - format = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s', '%y-%m-%d %H:%M:%S') - handler = logging.handlers.TimedRotatingFileHandler(path+'/main.log', when='midnight', backupCount=5) - handler.setFormatter(format) - handler.setLevel(logging.DEBUG) - logging.getLogger('main').addHandler(handler) - - format = logging.Formatter('%(asctime)s %(message)s', '%y-%m-%d %H:%M:%S') - handler = logging.handlers.TimedRotatingFileHandler(path+'/pgnotices.log', when='midnight', backupCount=5) - handler.setFormatter(format) - handler.setLevel(logging.DEBUG) - logging.getLogger('pgmanager_notices').addHandler(handler) - - def main(self): - """Execute the queries.""" - for q in self.queries: - if self.parameters: - q = q.format(**self.parameters) - self.log.info('%s', q if len(q) < 100 else q[:100]+'...') - with self.pgm.cursor('target') as curs: - curs.execute(q) - self.log.info('Rows affected: %d', curs.rowcount) - try: - rows = curs.fetchall_dict() - self._write_output_file(rows) - except ProgrammingError: - pass - self.log.info('Done') - - def _write_output_file(self, rows): - if not self.args.output_file: - return - with open(self.args.output_file, 'w', encoding='utf8') as f: - for row in rows: - print(self.args.format.format(row), file=f) - - def _prepare_parameters(self, parameters): - self.parameters = {} - for parameter in parameters or (): - name, value = parameter.split('=', 1) - self.parameters[name] = value - - -tool = RunQueryTool() -tool.main() - diff -r d6088dba8fea -r 2a2d0d5df03b schemadiff.py --- a/schemadiff.py Tue May 06 18:37:41 2014 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,49 +0,0 @@ -#!/usr/bin/env python3 -# -# Print differences in database schema. -# -# Prints changes from source to destination. -# SQL patch updates source database schema to destination schema. -# - -from pgtoolkit import pgmanager, pgbrowser, pgdiff, toolbase - - -class SchemaDiffTool(toolbase.SrcDstTool): - def __init__(self): - toolbase.SrcDstTool.__init__(self, name='schemadiff', desc='Database schema diff.', allow_reverse = True) - - self.parser.add_argument('-s', dest='schema', nargs='*', help='Schema filter') - self.parser.add_argument('-t', dest='table', nargs='*', help='Table filter') - self.parser.add_argument('-f', dest='function', type=str, help='Function filter (regex)') - self.parser.add_argument('--sql', action='store_true', help='Output is SQL script.') - self.parser.add_argument('--body', action='store_true', help='Output diff for function bodies.') - - self.init() - - def main(self): - srcbrowser = pgbrowser.PgBrowser(self.pgm.get_conn('src')) - dstbrowser = pgbrowser.PgBrowser(self.pgm.get_conn('dst')) - - pgd = pgdiff.PgDiff(srcbrowser, dstbrowser) - - try: - if self.args.schema: - pgd.filter_schemas(include=self.args.schema) - if self.args.table: - pgd.filter_tables(include=self.args.table) - if self.args.function: - pgd.filter_functions(self.args.function) - if self.args.body: - pgd.function_body_diff = True - - if self.args.sql: - pgd.print_patch() - else: - pgd.print_diff() - except pgdiff.PgDiffError as e: - print('PgDiff error:', str(e)) - - -tool = SchemaDiffTool() -tool.main() diff -r d6088dba8fea -r 2a2d0d5df03b setup.py --- a/setup.py Tue May 06 18:37:41 2014 +0200 +++ b/setup.py Tue May 06 18:37:43 2014 +0200 @@ -4,14 +4,13 @@ setup( name='pgtoolkit', - version='0.3.1', + version='0.4.0', description='Postgres utilities, build on top of psycopg2', author='Radek Brich', author_email='radek.brich@devl.cz', url='http://hg.devl.cz/pgtoolkit', keywords=['postgresql', 'psycopg2', 'pool', 'keepalive'], - packages=['pgtoolkit', 'mytoolkit'], - scripts=['analyzeall.py', 'bigtables.py', 'listdepends.py', 'listserial.py', - 'longqueries.py', 'loopquery.py', 'runquery.py', 'schemadiff.py', 'tablediff.py'], - ) + packages=['pgtoolkit', 'pgtoolkit.tools', 'mytoolkit'], + scripts=['pgtool'], +)