Refactor ToolBase to allow tool composition. Add TableSync tool (composited). Move more tools under pgtool.
--- a/analyzeall.py Tue May 06 18:37:41 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,51 +0,0 @@
-#!/usr/bin/env python3
-"""
-analyzeall
-
-Analyze/vacuum all tables in selected schemas.
-See also "VACUUM ANALYZE VERBOSE;" query.
-Unlike that, this program skips pg_catalog etc.
-
-"""
-
-from pgtoolkit import pgbrowser, toolbase
-
-
-class AnalyzeAllTool(toolbase.SimpleTool):
- def __init__(self):
- toolbase.SimpleTool.__init__(self, name='analyzeall', desc='Analyze all tables.')
- self.parser.add_argument('-s', dest='schema', nargs='*', help='Schema filter')
- self.parser.add_argument('--vacuum', action='store_true', help='Call VACUUM ANALYZE')
- self.parser.add_argument('--vacuum-full', action='store_true', help='Call VACUUM FULL ANALYZE')
- self.parser.add_argument('--reindex', action='store_true', help='Call REINDEX TABLE')
- self.target_isolation_level = 'autocommit'
- self.init()
-
- def main(self):
- browser = pgbrowser.PgBrowser(self.pgm.get_conn('target'))
-
- query_patterns = ['ANALYZE %s.%s;']
- if self.args.vacuum:
- query_patterns = ['VACUUM ANALYZE %s.%s;']
- if self.args.vacuum_full:
- query_patterns = ['VACUUM FULL ANALYZE %s.%s;']
- if self.args.reindex:
- query_patterns += ['REINDEX TABLE %s.%s;']
-
- schema_list = self.args.schema
- if not schema_list:
- schema_list = [schema['name'] for schema in browser.list_schemas() if not schema['system']]
-
- for schema in schema_list:
- tables = browser.list_tables(schema=schema)
- with self.pgm.cursor('target') as curs:
- for table in tables:
- for query_pattern in query_patterns:
- query = query_pattern % (schema, table['name'])
- self.log.info(query)
- curs.execute(query, [])
-
-
-tool = AnalyzeAllTool()
-tool.main()
-
--- a/batchcopy.py Tue May 06 18:37:41 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,58 +0,0 @@
-#!/usr/bin/env python3
-
-from pgtoolkit import toolbase
-from pgtoolkit.pgmanager import IntegrityError
-
-
-class BatchCopyTool(toolbase.SrcDstTablesTool):
- def __init__(self):
- toolbase.SrcDstTablesTool.__init__(self, name='batchcopy', desc='Copy data from one table to another.')
-
- self.parser.add_argument('--table-name', type=str, help='Table to be copied.')
- self.parser.add_argument('--src-filter', type=str, help='WHERE condition for source query.')
- self.parser.add_argument('--file-with-ids', type=str, help='Read source IDs from file (each ID on new line). Use these in --src-filter as {ids}')
- self.parser.add_argument('--dst-exists', choices=['rollback', 'ignore', 'update'], default='rollback', help='What to do when destination record already exists.')
-
- self.init()
-
- def main(self):
- # read list of IDs from file
- ids = '<no IDs read>'
- if self.args.file_with_ids:
- with open(self.args.file_with_ids, 'r') as f:
- ids = ','.join(ln.rstrip() for ln in f.readlines())
-
- # read source data
- with self.pgm.cursor('src') as src_curs:
- condition = self.args.src_filter.format(ids=ids) or 'true'
- src_curs.execute('SELECT * FROM {} WHERE {}'.format(self.args.table_name, condition))
- #TODO: ORDER BY id OFFSET 0 LIMIT 100
- data = src_curs.fetchall_dict()
- src_curs.connection.commit()
-
- with self.pgm.cursor('dst') as dst_curs:
- copied = 0
- for row in data:
- keys = ', '.join(row.keys())
- values_mask = ', '.join(['%s'] * len(row))
- query = 'INSERT INTO {} ({}) VALUES ({})'.format(self.args.table_name, keys, values_mask)
- try:
- dst_curs.execute('SAVEPOINT the_query;')
- dst_curs.execute(query, list(row.values()))
- dst_curs.execute('RELEASE SAVEPOINT the_query;')
- copied += 1
- except IntegrityError:
- if self.args.dst_exists == 'rollback':
- dst_curs.connection.rollback()
- break
- elif self.args.dst_exists == 'ignore':
- dst_curs.execute('ROLLBACK TO SAVEPOINT the_query;')
- elif self.args.dst_exists == 'update':
- raise NotImplementedError()
- dst_curs.connection.commit()
-
- self.log.info('Copied %s rows.', copied)
-
-
-tool = BatchCopyTool()
-tool.main()
--- a/batchquery.py Tue May 06 18:37:41 2014 +0200
+++ b/batchquery.py Tue May 06 18:37:43 2014 +0200
@@ -12,7 +12,6 @@
self.parser.add_argument('--output', dest='output', type=str, help='File name for results.')
self.parser.add_argument('--outputfunc', dest='outputfunc', type=str, help='Python function which will format results (format_row(args, rows)).')
self.parser.add_argument('--header', dest='header', action='store_true', help='First line of CSV is header with names for columns. These name can be used in query.')
- self.init()
def _split_line(self, line):
return [x.strip() for x in line.split(',')]
@@ -65,5 +64,6 @@
tool = BatchQueryTool()
+tool.setup()
tool.main()
--- a/bigtables.py Tue May 06 18:37:41 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,49 +0,0 @@
-#!/usr/bin/env python3
-
-from pgtoolkit import pgbrowser, toolbase
-from pycolib.prettysize import prettysize_short
-from pycolib.ansicolor import highlight
-
-
-class BigTablesTool(toolbase.SimpleTool):
- def __init__(self):
- toolbase.SimpleTool.__init__(self, name='bigtables', desc='List largest tables.')
- self.parser.add_argument('-n', '--limit', metavar='NUM', dest='limit', type=int, default=5, help='Show NUM biggest tables.')
- self.parser.add_argument('-v', '--details', dest='details', action='store_true', help='Show sizes of data and individual indexes.')
- self.init()
-
- def main(self):
- browser = pgbrowser.PgBrowser(self.pgm.get_conn('target'))
-
- # scan all tables from all shemas, remember names and sizes
- all_tables = []
- all_indexes = []
- schemas = browser.list_schemas()
- for schema in schemas:
- tables = browser.list_tables(schema['name'])
- for table in tables:
- table_name = '%s.%s' % (schema['name'], table['name'])
- indexes = browser.list_indexes(table['name'], schema['name'])
- for index in indexes:
- all_indexes.append({'name': index['name'], 'table': table_name, 'size': index['size']})
- all_tables.append({'name': table_name, 'size': table['size'], 'indexes': indexes})
-
- # print names and sizes of 20 largest tables
- for table in sorted(all_tables, reverse=True, key=lambda x: x['size'])[:self.args.limit]:
- size_of_indexes = sum(index['size'] for index in table['indexes'])
- print(highlight(1) + prettysize_short(table['size'] + size_of_indexes, trailing_zeros=True).rjust(8) + highlight(0),
- '(total)'.ljust(8),
- highlight(1) + table['name'] + highlight(0), sep=' ')
- if self.args.details:
- print(prettysize_short(table['size'], trailing_zeros=True).rjust(8),
- '(data)'.ljust(8), sep=' ')
- for index in sorted(table['indexes'], reverse=True, key=lambda x: x['size']):
- print(prettysize_short(index['size'], trailing_zeros=True).rjust(8),
- '(index)'.ljust(8), index['name'], sep=' ')
- print()
-
-
-if __name__ == '__main__':
- tool = BigTablesTool()
- tool.main()
-
--- a/listdepends.py Tue May 06 18:37:41 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,25 +0,0 @@
-#!/usr/bin/env python3
-
-from pgtoolkit import pgbrowser, toolbase
-
-
-class ListDependsTool(toolbase.SimpleTool):
- def __init__(self):
- toolbase.SimpleTool.__init__(self, name='listdepends', desc='List column dependencies.')
- self.parser.add_argument('table', metavar='table', type=str, help='Table name.')
- self.parser.add_argument('column', metavar='column', type=str, help='Column name.')
- self.parser.add_argument('-s', '--schema', dest='schema', metavar='schema',
- type=str, default='public', help='Schema name (default=public).')
- self.init()
-
- def main(self):
- browser = pgbrowser.PgBrowser(self.pgm.get_conn('target'))
-
- objects = browser.list_column_usage(self.args.table, self.args.column, schema=self.args.schema)
- for obj in sorted(objects, key=lambda x: (x['type'], x['schema'], x['name'])):
- print(obj['type'], ' ', obj['schema'], '.', obj['name'], sep='')
-
-
-tool = ListDependsTool()
-tool.main()
-
--- a/listserial.py Tue May 06 18:37:41 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,59 +0,0 @@
-#!/usr/bin/env python3
-
-from pgtoolkit import toolbase, pgbrowser
-from pycolib.ansicolor import highlight, WHITE, YELLOW, RED, BOLD
-
-
-class ListSerialTool(toolbase.SimpleTool):
- max_int = 2147483647
-
- def __init__(self):
- toolbase.SimpleTool.__init__(self, name='listserial', desc='List sequences attached to column of type integer with dangerous last_value.')
- self.init()
-
- def main(self):
- conn = self.pgm.get_conn('target')
- browser = pgbrowser.PgBrowser(conn)
- rows = browser.list_sequences()
- sequences = []
- for row in rows:
- if row['related_column_type'] == 'integer':
- # read sequence attributes like last_value
- q = 'SELECT * FROM "%s"."%s"' % (row['sequence_schema'], row['sequence_name'])
- curs = conn.cursor()
- curs.execute(q)
- attrs = curs.fetchone_dict()
- # skip this sequence if its cycled and has safe max_value
- if attrs['is_cycled'] and attrs['max_value'] <= self.max_int:
- continue
- # skip sequences with last_value not yet in half of max_int
- if attrs['last_value'] < self.max_int / 2:
- continue
- # remember rest of sequences
- row['attrs'] = attrs
- sequences.append(row)
- # sort most dangerous on top
- sequences.sort(key=lambda x: x['attrs']['last_value'], reverse=True)
- # print out what we've found
- for seq in sequences:
- print('Sequence:', seq['sequence_schema'] + '.' + seq['sequence_name'])
- print(' Related:', seq['sequence_schema'] + '.' + seq['related_table'], seq['related_column'], '(' + seq['related_column_type'] + ')')
- print(' integer max', '2147483647')
- # colorize last value
- last_val = seq['attrs']['last_value']
- col = WHITE + BOLD
- if last_val > self.max_int * 0.9:
- # near max
- col = YELLOW + BOLD
- if last_val > self.max_int:
- # over max
- col = RED + BOLD
- print(' last_value', highlight(1, col) + str(last_val) + highlight(0))
- for key in ('min_value', 'max_value', 'is_cycled'):
- print(' ', key, seq['attrs'][key])
- print()
-
-
-tool = ListSerialTool()
-tool.main()
-
--- a/listtables.py Tue May 06 18:37:41 2014 +0200
+++ b/listtables.py Tue May 06 18:37:43 2014 +0200
@@ -7,7 +7,6 @@
def __init__(self):
toolbase.SimpleTool.__init__(self, name='listtables', desc='List tables in database.')
self.parser.add_argument('-o', dest='options', type=str, nargs='*', help='Filter by options (eg. -o autovacuum_enabled=false).')
- self.init()
def main(self):
browser = pgbrowser.PgBrowser(self.pgm.get_conn('target'))
@@ -31,5 +30,6 @@
tool = ListTablesTool()
+tool.setup()
tool.main()
--- a/longqueries.py Tue May 06 18:37:41 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,29 +0,0 @@
-#!/usr/bin/env python3
-
-from pgtoolkit import pgstats, toolbase
-from pycolib.ansicolor import highlight, YELLOW, BOLD
-
-
-class LongQueriesTool(toolbase.SimpleTool):
- def __init__(self):
- toolbase.SimpleTool.__init__(self, name='longqueries', desc='List long queries.')
- self.parser.add_argument('--age', default='1m', help='How long must be the query running to be listed.')
- self.init()
-
- def main(self):
- stats = pgstats.PgStats(self.pgm.get_conn('target'))
-
- for ln in stats.list_long_queries(self.args.age):
- print(highlight(1),
- 'backend PID: ', ln['procpid'],
- ', query_start: ', ln['query_start'].strftime('%F %T'),
- ', client IP: ', ln['client_addr'],
- ln['waiting'] and ', ' + highlight(1, YELLOW|BOLD) + 'waiting' or '',
- highlight(0), sep='')
- print(ln['query'])
- print()
-
-
-tool = LongQueriesTool()
-tool.main()
-
--- a/loopquery.py Tue May 06 18:37:41 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,98 +0,0 @@
-#!/usr/bin/env python3
-"""
-loopquery
-
-Execute some queries in loop. Execute interval can be set.
-
-"""
-
-from pgtoolkit import toolbase
-
-import logging.handlers
-import time
-from datetime import datetime, timedelta
-
-
-class LoopQueryTool(toolbase.ToolBase):
- def __init__(self):
- toolbase.ToolBase.__init__(self, name='loopquery', desc='Run query in loop.')
- self.parser.add_argument('target', nargs='?', metavar='target', type=str, help='Target database')
- self.parser.add_argument('-c', dest='config', type=str, help='Additional config file (besides pgtoolkit.conf).')
- self.parser.add_argument('-q', dest='queries', metavar='QUERY', nargs='*', help='Queries to run.')
- self.parser.add_argument('--mins', dest='delay_mins', type=int, help='Delay between queries in minutes.')
- self.parser.add_argument('--secs', dest='delay_secs', type=int, help='Delay between queries in seconds.')
-
- self.config.add_option('target', type=str, default=None)
- self.config.add_option('queries', type=list, default=[])
- self.config.add_option('delay_mins', type=int, default=0)
- self.config.add_option('delay_secs', type=int, default=0)
- self.config.add_option('log_path', type=str)
-
- self.target_isolation_level = 'autocommit'
-
- self.init()
-
- def init(self):
- toolbase.ToolBase.init(self)
- if self.args.config:
- self.config.load(self.args.config)
- self.queries = self.args.queries or self.config.queries
- self.delay_mins = self.args.delay_mins or self.config.delay_mins
- self.delay_secs = self.args.delay_secs or self.config.delay_secs
- if self.config.log_path:
- self.init_file_logs(self.config.log_path)
- self.prepare_conns(target = self.args.target or self.config.target)
-
- def init_file_logs(self, path):
- format = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s', '%y-%m-%d %H:%M:%S')
- handler = logging.handlers.TimedRotatingFileHandler(path+'/main.log', when='midnight', backupCount=5)
- handler.setFormatter(format)
- handler.setLevel(logging.DEBUG)
- logging.getLogger('main').addHandler(handler)
-
- format = logging.Formatter('%(asctime)s %(message)s', '%y-%m-%d %H:%M:%S')
- handler = logging.handlers.TimedRotatingFileHandler(path+'/pgnotices.log', when='midnight', backupCount=5)
- handler.setFormatter(format)
- handler.setLevel(logging.DEBUG)
- logging.getLogger('pgmanager_notices').addHandler(handler)
-
- def main(self):
- self.reset()
- while True:
- self.wait()
- self.action()
-
- def reset(self):
- """Check current time, set next action time."""
- dt = datetime.today()
- dt = dt.replace(microsecond = 0)
- self.next_action_time = dt + timedelta(minutes = self.delay_mins,
- seconds = self.delay_secs)
-
- def wait(self):
- """Wait for action time, compute next action time."""
- now = datetime.today()
- self.log.debug('Next run %s', self.next_action_time)
- if self.next_action_time > now:
- td = self.next_action_time - now
- self.log.debug('Sleep %ds', td.seconds + td.microseconds/1e6)
- time.sleep(td.seconds + td.microseconds/1e6)
- self.next_action_time += timedelta(minutes = self.delay_mins,
- seconds = self.delay_secs)
- # in case that action took too long and next planned time would
- # be in past -> reset planner
- if self.next_action_time < now:
- self.reset()
-
- def action(self):
- """Execute the queries."""
- for q in self.queries:
- self.log.info('%s', q)
- with self.pgm.cursor('target') as curs:
- curs.execute(q)
- self.log.info('Done')
-
-
-tool = LoopQueryTool()
-tool.main()
-
--- a/pgtool Tue May 06 18:37:41 2014 +0200
+++ b/pgtool Tue May 06 18:37:43 2014 +0200
@@ -24,21 +24,35 @@
from importlib import import_module
-if len(sys.argv) < 2:
- print(__doc__, end='')
- sys.exit()
+def print_tool_with_short_desc(name):
+ module = import_module('pgtoolkit.tools.' + tool)
+ short_desc = module.cls.__doc__.lstrip().splitlines()[0]
+ print(name.ljust(15), '-', short_desc)
-if sys.argv[1] == '--list':
- for tool in pgtoolkit.tools.__all__:
- print(tool)
- sys.exit()
+
+if __name__ == '__main__':
+ if len(sys.argv) < 2:
+ print(__doc__, end='')
+ sys.exit()
-tool = sys.argv[1]
-tool_args = sys.argv[2:]
+ if sys.argv[1].startswith('--'):
+ if sys.argv[1] == '--list':
+ for tool in pgtoolkit.tools.__all__:
+ print_tool_with_short_desc(tool)
+ else:
+ print(__doc__, end='')
+ sys.exit()
-module = import_module('pgtoolkit.tools.' + tool)
+ tool = sys.argv[1]
+ tool_args = sys.argv[2:]
-tool = module.cls()
-tool.init(tool_args)
-tool.main()
+ if tool not in pgtoolkit.tools.__all__:
+ print('Unknown tool "%s".\n\nCall "pgtool --list" to get list of all available tools.' % tool)
+ sys.exit()
+ module = import_module('pgtoolkit.tools.' + tool)
+
+ tool = module.cls()
+ tool.setup(tool_args)
+ tool.main()
+
--- a/pgtoolkit/pgdatadiff.py Tue May 06 18:37:41 2014 +0200
+++ b/pgtoolkit/pgdatadiff.py Tue May 06 18:37:43 2014 +0200
@@ -28,6 +28,8 @@
from pgtoolkit import pgbrowser
from pycolib.ansicolor import *
+import sys
+
class DiffData:
COLORS = {
@@ -38,14 +40,14 @@
'K' : BOLD | BLUE}
def __init__(self, change, cols1, cols2, key=None):
- '''
+ """
change - one of '+', '-', '*' (add, remove, update)
cols1 - original column values (OrderedDict)
cols2 - new column values (OrderedDict)
key - primary key columns (OrderedDict)
- '''
+ """
self.change = change
self.cols1 = cols1
self.cols2 = cols2
@@ -155,11 +157,11 @@
self.fulltable2 = '"' + schema + '"."'+ table + '"'
def iter_diff(self):
- '''Return differencies between data of two tables.
+ """Return differencies between data of two tables.
Yields one line at the time.
- '''
+ """
curs1, curs2 = self._select()
row1 = curs1.fetchone_dict()
@@ -186,25 +188,25 @@
curs1.close()
curs2.close()
- def print_diff(self):
- '''Print differencies between data of two tables.
+ def print_diff(self, file=sys.stdout):
+ """Print differencies between data of two tables.
The output is in human readable form.
Set allowcolor=True of PgDataDiff instance to get colored output.
- '''
+ """
for ln in self.iter_diff():
- print(ln.format())
+ print(ln.format(), file=file)
- def print_patch(self):
- '''Print SQL script usable as patch for destination table.
+ def print_patch(self, file=sys.stdout):
+ """Print SQL script usable as patch for destination table.
Supports INSERT, DELETE and UPDATE operations.
- '''
+ """
for ln in self.iter_diff():
- print(ln.format_patch(self.fulltable1))
+ print(ln.format_patch(self.fulltable1), file=file)
def _select(self):
browser = pgbrowser.PgBrowser(self.conn1)
--- a/pgtoolkit/pgmanager.py Tue May 06 18:37:41 2014 +0200
+++ b/pgtoolkit/pgmanager.py Tue May 06 18:37:43 2014 +0200
@@ -334,6 +334,9 @@
del self.conn_known[name]
del self.conn_pool[name]
+ def knows_conn(self, name='default'):
+ return name in self.conn_known
+
def get_conn(self, name='default'):
'''Get connection of name 'name' from pool.'''
self._check_fork()
--- a/pgtoolkit/toolbase.py Tue May 06 18:37:41 2014 +0200
+++ b/pgtoolkit/toolbase.py Tue May 06 18:37:43 2014 +0200
@@ -26,24 +26,29 @@
class ToolBase:
- def __init__(self, name, desc, **kwargs):
- self.parser = argparse.ArgumentParser(prog=name, description=desc,
+
+ def __init__(self, name, desc=None, **kwargs):
+ self.config = ConfigParser()
+ self.parser = argparse.ArgumentParser(prog=name, description=desc or self.__doc__,
formatter_class=ToolDescriptionFormatter)
- self.parser.add_argument('-d', dest='debug', action='store_true',
- help='Debug mode - print database queries.')
+ self.pgm = pgmanager.get_instance()
+ self.target_isolation_level = None
- self.config = ConfigParser()
+ def setup(self, args=None):
+ self.specify_args()
+ self.load_args(args)
+ self.init_logging()
+
+ def specify_args(self):
self.config.add_option('databases', dict)
self.config.add_option('meta_db')
self.config.add_option('meta_query')
-
- self.pgm = pgmanager.get_instance()
- self.target_isolation_level = None
+ self.parser.add_argument('-Q', dest='queries', action='store_true',
+ help='Print database queries.')
- def init(self, args=None):
- self.config.load('pgtoolkit.conf')
+ def load_args(self, args=None, config_file=None):
+ self.config.load(config_file or 'pgtoolkit.conf')
self.args = self.parser.parse_args(args)
- self.init_logging()
def init_logging(self):
# logging
@@ -59,18 +64,20 @@
log_notices.addHandler(handler)
log_notices.setLevel(logging.DEBUG)
- if self.args.debug:
+ if self.args.queries:
log_sql = logging.getLogger('pgmanager_sql')
log_sql.addHandler(handler)
log_sql.setLevel(logging.DEBUG)
def prepare_conn_from_metadb(self, name, lookup_name):
- '''Create connection in pgmanager using meta DB.
+ """Create connection in pgmanager using meta DB.
name -- Name for connection in pgmanager.
lookup_name -- Name of connection in meta DB.
- '''
+ """
+ if not self.pgm.knows_conn('meta'):
+ self.pgm.create_conn(name='meta', dsn=self.config.meta_db)
with self.pgm.cursor('meta') as curs:
curs.execute(self.config.meta_query, [lookup_name])
row = curs.fetchone_dict()
@@ -80,9 +87,10 @@
isolation_level=self.target_isolation_level,
**row)
return True
+ self.pgm.close_conn('meta')
def prepare_conn_from_config(self, name, lookup_name):
- '''Create connection in pgmanager using info in config.databases.'''
+ """Create connection in pgmanager using info in config.databases."""
if self.config.databases:
if lookup_name in self.config.databases:
dsn = self.config.databases[lookup_name]
@@ -99,9 +107,6 @@
value: connection name in config or meta DB
"""
- if self.config.meta_db:
- self.pgm.create_conn(name='meta', dsn=self.config.meta_db)
-
for name in kwargs:
lookup_name = kwargs[name]
found = self.prepare_conn_from_config(name, lookup_name)
@@ -110,41 +115,52 @@
if not found:
raise ConnectionInfoNotFound('Connection name "%s" not found in config nor in meta DB.' % lookup_name)
- if self.config.meta_db:
- self.pgm.close_conn('meta')
-
class SimpleTool(ToolBase):
- def __init__(self, name, desc, **kwargs):
+
+ def __init__(self, name, desc=None, **kwargs):
ToolBase.__init__(self, name, desc, **kwargs)
+
+ def specify_args(self):
+ ToolBase.specify_args(self)
self.parser.add_argument('target', metavar='target', type=str, help='Target database')
- def init(self, args=None):
- ToolBase.init(self, args)
+ def setup(self, args=None):
+ ToolBase.setup(self, args)
self.prepare_conns(target=self.args.target)
class SrcDstTool(ToolBase):
- def __init__(self, name, desc, **kwargs):
+
+ def __init__(self, name, desc=None, *, allow_reverse=False, force_reverse=False, **kwargs):
ToolBase.__init__(self, name, desc, **kwargs)
+ self.allow_reverse = allow_reverse
+ self.force_reverse = force_reverse
+
+ def specify_args(self):
+ ToolBase.specify_args(self)
self.parser.add_argument('src', metavar='source', type=str, help='Source database')
self.parser.add_argument('dst', metavar='destination', type=str, help='Destination database')
- if 'allow_reverse' in kwargs and kwargs['allow_reverse']:
+ if self.allow_reverse:
self.parser.add_argument('-r', '--reverse', action='store_true', help='Reverse operation. Swap source and destination.')
- def init(self, args=None):
- ToolBase.init(self, args)
+ def load_args(self, args=None, config_file=None):
+ ToolBase.load_args(self, args, config_file)
if self.is_reversed():
self.args.src, self.args.dst = self.args.dst, self.args.src
+
+ def setup(self, args=None):
+ ToolBase.setup(self, args)
self.prepare_conns(src=self.args.src, dst=self.args.dst)
def is_reversed(self):
- return 'reverse' in self.args and self.args.reverse
+ return ('reverse' in self.args and self.args.reverse) or self.force_reverse
class SrcDstTablesTool(SrcDstTool):
- def __init__(self, name, desc, **kwargs):
- SrcDstTool.__init__(self, name, desc, **kwargs)
+
+ def specify_args(self):
+ SrcDstTool.specify_args(self)
self.parser.add_argument('-t', '--src-table', metavar='source_table',
dest='srctable', type=str, default='', help='Source table name.')
self.parser.add_argument('-s', '--src-schema', metavar='source_schema',
@@ -155,9 +171,11 @@
dest='dstschema', type=str, default='', help='Destination schema name (default=source_schema).')
self.parser.add_argument('--regex', action='store_true', help="Use RE in schema or table name.")
- def init(self, args=None):
- SrcDstTool.init(self, args)
+ def load_args(self, args=None, config_file=None):
+ SrcDstTool.load_args(self, args, config_file)
+ self.load_table_names()
+ def load_table_names(self):
self.schema1 = self.args.srcschema
self.table1 = self.args.srctable
self.schema2 = self.args.dstschema
--- a/pgtoolkit/tools/__init__.py Tue May 06 18:37:41 2014 +0200
+++ b/pgtoolkit/tools/__init__.py Tue May 06 18:37:43 2014 +0200
@@ -1,1 +1,3 @@
-__all__ = ['tablediff']
+__all__ = ['analyzeall', 'batchcopy', 'bigtables', 'listdepends',
+ 'listserial', 'longqueries', 'loopquery',
+ 'runquery', 'schemadiff', 'tablediff', 'tablesync']
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pgtoolkit/tools/analyzeall.py Tue May 06 18:37:43 2014 +0200
@@ -0,0 +1,52 @@
+from pgtoolkit.toolbase import SimpleTool
+from pgtoolkit import pgbrowser
+
+
+class AnalyzeAllTool(SimpleTool):
+
+ """
+ Analyze/vacuum all tables in selected schemas.
+
+ Partially emulates VACUUM ANALYZE VERBOSE query.
+ But this program is more configurable, skips pg_catalog etc.
+
+ """
+
+ def __init__(self):
+ SimpleTool.__init__(self, name='analyzeall')
+ self.target_isolation_level = 'autocommit'
+
+ def specify_args(self):
+ SimpleTool.specify_args(self)
+ self.parser.add_argument('-s', dest='schema', nargs='*', help='Schema filter')
+ self.parser.add_argument('--vacuum', action='store_true', help='Call VACUUM ANALYZE')
+ self.parser.add_argument('--vacuum-full', action='store_true', help='Call VACUUM FULL ANALYZE')
+ self.parser.add_argument('--reindex', action='store_true', help='Call REINDEX TABLE')
+
+ def main(self):
+ browser = pgbrowser.PgBrowser(self.pgm.get_conn('target'))
+
+ query_patterns = ['ANALYZE %s.%s;']
+ if self.args.vacuum:
+ query_patterns = ['VACUUM ANALYZE %s.%s;']
+ if self.args.vacuum_full:
+ query_patterns = ['VACUUM FULL ANALYZE %s.%s;']
+ if self.args.reindex:
+ query_patterns += ['REINDEX TABLE %s.%s;']
+
+ schema_list = self.args.schema
+ if not schema_list:
+ schema_list = [schema['name'] for schema in browser.list_schemas() if not schema['system']]
+
+ for schema in schema_list:
+ tables = browser.list_tables(schema=schema)
+ with self.pgm.cursor('target') as curs:
+ for table in tables:
+ for query_pattern in query_patterns:
+ query = query_pattern % (schema, table['name'])
+ self.log.info(query)
+ curs.execute(query, [])
+
+
+cls = AnalyzeAllTool
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pgtoolkit/tools/batchcopy.py Tue May 06 18:37:43 2014 +0200
@@ -0,0 +1,61 @@
+from pgtoolkit.toolbase import SrcDstTablesTool
+from pgtoolkit.pgmanager import IntegrityError
+
+
+class BatchCopyTool(SrcDstTablesTool):
+
+ """
+ Copy data from one table to another, filtering by specified condition.
+
+ """
+
+ def __init__(self):
+ SrcDstTablesTool.__init__(self, name='batchcopy', desc='')
+
+ def specify_args(self):
+ SrcDstTablesTool.specify_args(self)
+ self.parser.add_argument('--table-name', type=str, help='Table to be copied.')
+ self.parser.add_argument('--src-filter', type=str, help='WHERE condition for source query.')
+ self.parser.add_argument('--file-with-ids', type=str, help='Read source IDs from file (each ID on new line). Use these in --src-filter as {ids}')
+ self.parser.add_argument('--dst-exists', choices=['rollback', 'ignore', 'update'], default='rollback', help='What to do when destination record already exists.')
+
+ def main(self):
+ # read list of IDs from file
+ ids = '<no IDs read>'
+ if self.args.file_with_ids:
+ with open(self.args.file_with_ids, 'r') as f:
+ ids = ','.join(ln.rstrip() for ln in f.readlines())
+
+ # read source data
+ with self.pgm.cursor('src') as src_curs:
+ condition = self.args.src_filter.format(ids=ids) or 'true'
+ src_curs.execute('SELECT * FROM {} WHERE {}'.format(self.args.table_name, condition))
+ #TODO: ORDER BY id OFFSET 0 LIMIT 100
+ data = src_curs.fetchall_dict()
+ src_curs.connection.commit()
+
+ with self.pgm.cursor('dst') as dst_curs:
+ copied = 0
+ for row in data:
+ keys = ', '.join(row.keys())
+ values_mask = ', '.join(['%s'] * len(row))
+ query = 'INSERT INTO {} ({}) VALUES ({})'.format(self.args.table_name, keys, values_mask)
+ try:
+ dst_curs.execute('SAVEPOINT the_query;')
+ dst_curs.execute(query, list(row.values()))
+ dst_curs.execute('RELEASE SAVEPOINT the_query;')
+ copied += 1
+ except IntegrityError:
+ if self.args.dst_exists == 'rollback':
+ dst_curs.connection.rollback()
+ break
+ elif self.args.dst_exists == 'ignore':
+ dst_curs.execute('ROLLBACK TO SAVEPOINT the_query;')
+ elif self.args.dst_exists == 'update':
+ raise NotImplementedError()
+ dst_curs.connection.commit()
+
+ self.log.info('Copied %s rows.', copied)
+
+
+cls = BatchCopyTool
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pgtoolkit/tools/bigtables.py Tue May 06 18:37:43 2014 +0200
@@ -0,0 +1,55 @@
+from pgtoolkit.toolbase import SimpleTool
+from pgtoolkit import pgbrowser
+from pycolib.prettysize import prettysize_short
+from pycolib.ansicolor import highlight
+
+
+class BigTablesTool(SimpleTool):
+
+ """List largest tables.
+
+ Reads size statistics of tables and indexes from pgcatalog.
+
+ """
+
+ def __init__(self):
+ SimpleTool.__init__(self, name='bigtables')
+
+ def specify_args(self):
+ SimpleTool.specify_args(self)
+ self.parser.add_argument('-n', '--limit', metavar='NUM', dest='limit', type=int, default=5, help='Show NUM biggest tables.')
+ self.parser.add_argument('-v', '--details', dest='details', action='store_true', help='Show sizes of data and individual indexes.')
+
+ def main(self):
+ browser = pgbrowser.PgBrowser(self.pgm.get_conn('target'))
+
+ # scan all tables from all shemas, remember names and sizes
+ all_tables = []
+ all_indexes = []
+ schemas = browser.list_schemas()
+ for schema in schemas:
+ tables = browser.list_tables(schema['name'])
+ for table in tables:
+ table_name = '%s.%s' % (schema['name'], table['name'])
+ indexes = browser.list_indexes(table['name'], schema['name'])
+ for index in indexes:
+ all_indexes.append({'name': index['name'], 'table': table_name, 'size': index['size']})
+ all_tables.append({'name': table_name, 'size': table['size'], 'indexes': indexes})
+
+ # print names and sizes of 20 largest tables
+ for table in sorted(all_tables, reverse=True, key=lambda x: x['size'])[:self.args.limit]:
+ size_of_indexes = sum(index['size'] for index in table['indexes'])
+ print(highlight(1) + prettysize_short(table['size'] + size_of_indexes, trailing_zeros=True).rjust(8) + highlight(0),
+ '(total)'.ljust(8),
+ highlight(1) + table['name'] + highlight(0), sep=' ')
+ if self.args.details:
+ print(prettysize_short(table['size'], trailing_zeros=True).rjust(8),
+ '(data)'.ljust(8), sep=' ')
+ for index in sorted(table['indexes'], reverse=True, key=lambda x: x['size']):
+ print(prettysize_short(index['size'], trailing_zeros=True).rjust(8),
+ '(index)'.ljust(8), index['name'], sep=' ')
+ print()
+
+
+cls = BigTablesTool
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pgtoolkit/tools/listdepends.py Tue May 06 18:37:43 2014 +0200
@@ -0,0 +1,31 @@
+from pgtoolkit.toolbase import SimpleTool
+from pgtoolkit import pgbrowser
+
+
+class ListDependsTool(SimpleTool):
+
+ """
+ List column dependencies.
+
+ """
+
+ def __init__(self):
+ SimpleTool.__init__(self, name='listdepends')
+
+ def specify_args(self):
+ SimpleTool.specify_args(self)
+ self.parser.add_argument('table', metavar='table', type=str, help='Table name.')
+ self.parser.add_argument('column', metavar='column', type=str, help='Column name.')
+ self.parser.add_argument('-s', '--schema', dest='schema', metavar='schema',
+ type=str, default='public', help='Schema name (default=public).')
+
+ def main(self):
+ browser = pgbrowser.PgBrowser(self.pgm.get_conn('target'))
+
+ objects = browser.list_column_usage(self.args.table, self.args.column, schema=self.args.schema)
+ for obj in sorted(objects, key=lambda x: (x['type'], x['schema'], x['name'])):
+ print(obj['type'], ' ', obj['schema'], '.', obj['name'], sep='')
+
+
+cls = ListDependsTool
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pgtoolkit/tools/listserial.py Tue May 06 18:37:43 2014 +0200
@@ -0,0 +1,69 @@
+from pgtoolkit.toolbase import SimpleTool
+from pgtoolkit import pgbrowser
+from pycolib.ansicolor import highlight, WHITE, YELLOW, RED, BOLD
+
+
+class ListSerialTool(SimpleTool):
+
+ """List sequences near to overflow.
+
+ Checks all sequences attached to a column of type integer.
+
+ Highlight dangerous values of sequence:
+ * Yellow - near overflow (90%)
+ * Red - already over...
+
+ Does not list sequences with value under 50% of range.
+
+ """
+
+ max_int = 2147483647
+
+ def __init__(self):
+ SimpleTool.__init__(self, name='listserial')
+
+ def main(self):
+ conn = self.pgm.get_conn('target')
+ browser = pgbrowser.PgBrowser(conn)
+ rows = browser.list_sequences()
+ sequences = []
+ for row in rows:
+ if row['related_column_type'] == 'integer':
+ # read sequence attributes like last_value
+ q = 'SELECT * FROM "%s"."%s"' % (row['sequence_schema'], row['sequence_name'])
+ curs = conn.cursor()
+ curs.execute(q)
+ attrs = curs.fetchone_dict()
+ # skip this sequence if its cycled and has safe max_value
+ if attrs['is_cycled'] and attrs['max_value'] <= self.max_int:
+ continue
+ # skip sequences with last_value not yet in half of max_int
+ if attrs['last_value'] < self.max_int / 2:
+ continue
+ # remember rest of sequences
+ row['attrs'] = attrs
+ sequences.append(row)
+ # sort most dangerous on top
+ sequences.sort(key=lambda x: x['attrs']['last_value'], reverse=True)
+ # print out what we've found
+ for seq in sequences:
+ print('Sequence:', seq['sequence_schema'] + '.' + seq['sequence_name'])
+ print(' Related:', seq['sequence_schema'] + '.' + seq['related_table'], seq['related_column'], '(' + seq['related_column_type'] + ')')
+ print(' integer max', '2147483647')
+ # colorize last value
+ last_val = seq['attrs']['last_value']
+ col = WHITE + BOLD
+ if last_val > self.max_int * 0.9:
+ # near max
+ col = YELLOW + BOLD
+ if last_val > self.max_int:
+ # over max
+ col = RED + BOLD
+ print(' last_value', highlight(1, col) + str(last_val) + highlight(0))
+ for key in ('min_value', 'max_value', 'is_cycled'):
+ print(' ', key, seq['attrs'][key])
+ print()
+
+
+cls = ListSerialTool
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pgtoolkit/tools/longqueries.py Tue May 06 18:37:43 2014 +0200
@@ -0,0 +1,34 @@
+from pgtoolkit.toolbase import SimpleTool
+from pgtoolkit import pgstats
+from pycolib.ansicolor import highlight, YELLOW, BOLD
+
+
+class LongQueriesTool(SimpleTool):
+
+ """
+ List long running queries.
+ """
+
+ def __init__(self):
+ SimpleTool.__init__(self, name='longqueries')
+
+ def specify_args(self):
+ SimpleTool.specify_args(self)
+ self.parser.add_argument('--age', default='1m', help='How long must be the query running to be listed.')
+
+ def main(self):
+ stats = pgstats.PgStats(self.pgm.get_conn('target'))
+
+ for ln in stats.list_long_queries(self.args.age):
+ print(highlight(1),
+ 'backend PID: ', ln['procpid'],
+ ', query_start: ', ln['query_start'].strftime('%F %T'),
+ ', client IP: ', ln['client_addr'],
+ ln['waiting'] and ', ' + highlight(1, YELLOW|BOLD) + 'waiting' or '',
+ highlight(0), sep='')
+ print(ln['query'])
+ print()
+
+
+cls = LongQueriesTool
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pgtoolkit/tools/loopquery.py Tue May 06 18:37:43 2014 +0200
@@ -0,0 +1,101 @@
+from pgtoolkit.toolbase import ToolBase
+
+import logging.handlers
+import time
+from datetime import datetime, timedelta
+
+
+class LoopQueryTool(ToolBase):
+
+ """
+ Execute queries in loop, with configurable interval.
+
+ """
+
+ def __init__(self):
+ ToolBase.__init__(self, name='loopquery')
+ self.target_isolation_level = 'autocommit'
+
+ def specify_args(self):
+ ToolBase.specify_args(self)
+ self.parser.add_argument('target', nargs='?', metavar='target', type=str, help='Target database')
+ self.parser.add_argument('-c', dest='config', type=str, help='Additional config file (besides pgtoolkit.conf).')
+ self.parser.add_argument('-q', dest='queries', metavar='QUERY', nargs='*', help='Queries to run.')
+ self.parser.add_argument('--mins', dest='delay_mins', type=int, help='Delay between queries in minutes.')
+ self.parser.add_argument('--secs', dest='delay_secs', type=int, help='Delay between queries in seconds.')
+
+ self.config.add_option('target', type=str, default=None)
+ self.config.add_option('queries', type=list, default=[])
+ self.config.add_option('delay_mins', type=int, default=0)
+ self.config.add_option('delay_secs', type=int, default=0)
+ self.config.add_option('log_path', type=str)
+
+ def load_args(self, args=None, config_file=None):
+ ToolBase.load_args(self, args, config_file)
+ if self.args.config:
+ self.config.load(self.args.config)
+ self.queries = self.args.queries or self.config.queries
+ self.delay_mins = self.args.delay_mins or self.config.delay_mins
+ self.delay_secs = self.args.delay_secs or self.config.delay_secs
+
+ def init_logging(self):
+ ToolBase.init_logging(self)
+ if self.config.log_path:
+ self.init_file_logs(self.config.log_path)
+
+ def setup(self, args=None):
+ ToolBase.setup(self, args)
+ self.prepare_conns(target=self.args.target or self.config.target)
+
+ def init_file_logs(self, path):
+ format = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s', '%y-%m-%d %H:%M:%S')
+ handler = logging.handlers.TimedRotatingFileHandler(path+'/main.log', when='midnight', backupCount=5)
+ handler.setFormatter(format)
+ handler.setLevel(logging.DEBUG)
+ logging.getLogger('main').addHandler(handler)
+
+ format = logging.Formatter('%(asctime)s %(message)s', '%y-%m-%d %H:%M:%S')
+ handler = logging.handlers.TimedRotatingFileHandler(path+'/pgnotices.log', when='midnight', backupCount=5)
+ handler.setFormatter(format)
+ handler.setLevel(logging.DEBUG)
+ logging.getLogger('pgmanager_notices').addHandler(handler)
+
+ def main(self):
+ self.reset()
+ while True:
+ self.wait()
+ self.action()
+
+ def reset(self):
+ """Check current time, set next action time."""
+ dt = datetime.today()
+ dt = dt.replace(microsecond = 0)
+ self.next_action_time = dt + timedelta(minutes = self.delay_mins,
+ seconds = self.delay_secs)
+
+ def wait(self):
+ """Wait for action time, compute next action time."""
+ now = datetime.today()
+ self.log.debug('Next run %s', self.next_action_time)
+ if self.next_action_time > now:
+ td = self.next_action_time - now
+ self.log.debug('Sleep %ds', td.seconds + td.microseconds/1e6)
+ time.sleep(td.seconds + td.microseconds/1e6)
+ self.next_action_time += timedelta(minutes = self.delay_mins,
+ seconds = self.delay_secs)
+ # in case that action took too long and next planned time would
+ # be in past -> reset planner
+ if self.next_action_time < now:
+ self.reset()
+
+ def action(self):
+ """Execute the queries."""
+ for q in self.queries:
+ self.log.info('%s', q)
+ with self.pgm.cursor('target') as curs:
+ curs.execute(q)
+ self.log.info('Done')
+
+
+cls = LoopQueryTool
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pgtoolkit/tools/runquery.py Tue May 06 18:37:43 2014 +0200
@@ -0,0 +1,106 @@
+from pgtoolkit.toolbase import ToolBase
+
+import logging.handlers
+import time
+from datetime import datetime, timedelta
+from psycopg2 import ProgrammingError
+
+
+class RunQueryTool(ToolBase):
+
+ """
+ Execute configured queries in target database.
+ """
+
+ def __init__(self):
+ ToolBase.__init__(self, name='runquery')
+ self.target_isolation_level = 'autocommit'
+
+ def specify_args(self):
+ ToolBase.specify_args(self)
+ self.parser.add_argument('target', nargs='?', metavar='target', type=str, help='Target database')
+ self.parser.add_argument('-c', dest='config', type=str, help='Additional config file (besides pgtoolkit.conf).')
+ self.parser.add_argument('-q', dest='queries', metavar='QUERY', nargs='*', help='Queries to run.')
+ self.parser.add_argument('-f', dest='file', metavar='FILE', help='Read query from file.')
+ self.parser.add_argument('--one-query-per-line', action='store_true', help='When reading queries from file, consider each line as separate query.')
+ self.parser.add_argument('-p', '--parameter', dest='parameters', metavar='PARAM=VALUE', nargs='*',
+ help="If query should be used as format template, these parameters will be substituted.")
+ self.parser.add_argument('--output-file', dest='output_file', metavar='OUTPUT_FILE', help='Write query result in specified file.')
+ self.parser.add_argument('--format', dest='format', metavar='FORMAT', help='Format string for each line in output file (using Python\'s format()).')
+
+ self.config.add_option('target', type=str, default=None)
+ self.config.add_option('queries', type=list, default=[])
+ self.config.add_option('log_path', type=str)
+
+ def setup(self, args=None):
+ ToolBase.setup(self, args)
+ self.prepare_conns(target=self.args.target or self.config.target)
+
+ def load_args(self, args=None, config_file=None):
+ ToolBase.load_args(self, args, config_file)
+ if self.args.config:
+ self.config.load(self.args.config)
+ self.queries = self.args.queries or self.config.queries
+ # read query from file
+ if self.args.file:
+ with open(self.args.file, 'r', encoding='utf8') as f:
+ data = f.read()
+ if self.args.one_query_per_line:
+ file_queries = [ln for ln in data.splitlines() if not ln.lstrip().startswith('--')]
+ self.queries = file_queries + self.queries
+ else:
+ self.queries.insert(0, data)
+ # prepare parameters
+ self._prepare_parameters(self.args.parameters)
+
+ def init_logging(self):
+ ToolBase.init_logging(self)
+ if self.config.log_path:
+ self.init_file_logs(self.config.log_path)
+
+ def init_file_logs(self, path):
+ format = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s', '%y-%m-%d %H:%M:%S')
+ handler = logging.handlers.TimedRotatingFileHandler(path+'/main.log', when='midnight', backupCount=5)
+ handler.setFormatter(format)
+ handler.setLevel(logging.DEBUG)
+ logging.getLogger('main').addHandler(handler)
+
+ format = logging.Formatter('%(asctime)s %(message)s', '%y-%m-%d %H:%M:%S')
+ handler = logging.handlers.TimedRotatingFileHandler(path+'/pgnotices.log', when='midnight', backupCount=5)
+ handler.setFormatter(format)
+ handler.setLevel(logging.DEBUG)
+ logging.getLogger('pgmanager_notices').addHandler(handler)
+
+ def main(self):
+ """Execute the queries."""
+ print(self.queries)
+ for q in self.queries:
+ if self.parameters:
+ q = q.format(**self.parameters)
+ self.log.info('%s', q if len(q) < 100 else q[:100]+'...')
+ with self.pgm.cursor('target') as curs:
+ curs.execute(q)
+ self.log.info('Rows affected: %d', curs.rowcount)
+ try:
+ rows = curs.fetchall_dict()
+ self._write_output_file(rows)
+ except ProgrammingError:
+ pass
+ self.log.info('Done')
+
+ def _write_output_file(self, rows):
+ if not self.args.output_file:
+ return
+ with open(self.args.output_file, 'w', encoding='utf8') as f:
+ for row in rows:
+ print(self.args.format.format(row), file=f)
+
+ def _prepare_parameters(self, parameters):
+ self.parameters = {}
+ for parameter in parameters or ():
+ name, value = parameter.split('=', 1)
+ self.parameters[name] = value
+
+
+cls = RunQueryTool
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pgtoolkit/tools/schemadiff.py Tue May 06 18:37:43 2014 +0200
@@ -0,0 +1,51 @@
+from pgtoolkit.toolbase import SrcDstTool
+from pgtoolkit import pgmanager, pgbrowser, pgdiff, toolbase
+
+
+class SchemaDiffTool(SrcDstTool):
+
+ """
+ Print differences in database schema.
+
+ Prints changes from source to destination.
+ SQL patch updates source database schema to destination schema.
+
+ """
+
+ def __init__(self):
+ SrcDstTool.__init__(self, name='schemadiff', allow_reverse=True)
+
+ def specify_args(self):
+ SrcDstTool.specify_args(self)
+ self.parser.add_argument('-s', dest='schema', nargs='*', help='Schema filter')
+ self.parser.add_argument('-t', dest='table', nargs='*', help='Table filter')
+ self.parser.add_argument('-f', dest='function', type=str, help='Function filter (regex)')
+ self.parser.add_argument('--sql', action='store_true', help='Output is SQL script.')
+ self.parser.add_argument('--body', action='store_true', help='Output diff for function bodies.')
+
+ def main(self):
+ srcbrowser = pgbrowser.PgBrowser(self.pgm.get_conn('src'))
+ dstbrowser = pgbrowser.PgBrowser(self.pgm.get_conn('dst'))
+
+ pgd = pgdiff.PgDiff(srcbrowser, dstbrowser)
+
+ try:
+ if self.args.schema:
+ pgd.filter_schemas(include=self.args.schema)
+ if self.args.table:
+ pgd.filter_tables(include=self.args.table)
+ if self.args.function:
+ pgd.filter_functions(self.args.function)
+ if self.args.body:
+ pgd.function_body_diff = True
+
+ if self.args.sql:
+ pgd.print_patch()
+ else:
+ pgd.print_diff()
+ except pgdiff.PgDiffError as e:
+ print('PgDiff error:', str(e))
+
+
+cls = SchemaDiffTool
+
--- a/pgtoolkit/tools/tablediff.py Tue May 06 18:37:41 2014 +0200
+++ b/pgtoolkit/tools/tablediff.py Tue May 06 18:37:43 2014 +0200
@@ -2,11 +2,13 @@
from pgtoolkit.toolbase import SrcDstTablesTool
from pycolib.ansicolor import highlight, BOLD, YELLOW
+import sys
-class TableDiffTool(toolbase.SrcDstTablesTool):
+
+class TableDiffTool(SrcDstTablesTool):
"""
- Print differencies between data in tables.
+ Print differences between data in tables.
Requirements:
* Source table must have defined PRIMARY KEY.
@@ -17,19 +19,29 @@
def __init__(self):
SrcDstTablesTool.__init__(self, name='tablediff', desc=self.__doc__, allow_reverse=True)
+
+ def specify_args(self):
+ SrcDstTablesTool.specify_args(self)
self.parser.add_argument('--sql', action='store_true', help='Output is SQL script.')
self.parser.add_argument('--rowcount', action='store_true', help='Compare number of rows.')
+ self.parser.add_argument('-o', '--output-file', help='Output file for sql queries.')
def main(self):
srcconn = self.pgm.get_conn('src')
dstconn = self.pgm.get_conn('dst')
+ if self.args.output_file:
+ output_file = open(self.args.output_file, 'w')
+ else:
+ output_file = sys.stdout
+
dd = pgdatadiff.PgDataDiff(srcconn, dstconn)
for srcschema, srctable, dstschema, dsttable in self.tables():
print('-- Diff from [%s] %s.%s to [%s] %s.%s' % (
self.args.src, srcschema, srctable,
- self.args.dst, dstschema, dsttable))
+ self.args.dst, dstschema, dsttable),
+ file=output_file)
if self.args.rowcount:
with self.pgm.cursor('src') as curs:
@@ -39,16 +51,18 @@
curs.execute('''SELECT count(*) FROM "%s"."%s"''' % (dstschema, dsttable))
dstcount = curs.fetchone()[0]
if srccount != dstcount:
- print(highlight(1, BOLD | YELLOW), "Row count differs: src=%s dst=%s" % (srccount, dstcount), highlight(0), sep='')
+ print(highlight(1, BOLD | YELLOW),
+ "Row count differs: src=%s dst=%s" % (srccount, dstcount),
+ highlight(0), sep='', file=output_file)
continue
dd.settable1(srctable, srcschema)
dd.settable2(dsttable, dstschema)
if self.args.sql:
- dd.print_patch()
+ dd.print_patch(file=output_file)
else:
- dd.print_diff()
+ dd.print_diff(file=output_file)
cls = TableDiffTool
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pgtoolkit/tools/tablesync.py Tue May 06 18:37:43 2014 +0200
@@ -0,0 +1,60 @@
+from pgtoolkit.toolbase import SrcDstTool
+from pgtoolkit.tools.tablediff import TableDiffTool
+from pgtoolkit.tools.runquery import RunQueryTool
+
+
+class TableSyncTool(SrcDstTool):
+
+ """
+ Synchronize tables between two databases (tablediff + runquery).
+
+ This will essentially call following commands on each table from list:
+ * pgtool tablediff <source> <target> -r -s <schema> -t <table> --sql -o /tmp/diff.sql
+ * pgtool runquery <target> -f /tmp/diff.sql
+
+ """
+
+ def __init__(self):
+ SrcDstTool.__init__(self, name='tablesync', force_reverse=True)
+ self.tablediff = TableDiffTool()
+ self.tablediff.specify_args()
+ self.runquery = RunQueryTool()
+ self.runquery.specify_args()
+
+ def specify_args(self):
+ SrcDstTool.specify_args(self)
+ self.parser.add_argument('-t', dest='tables', metavar='table', nargs='*',
+ help="Tables to be synchronized.")
+ self.parser.add_argument('-s', '--schema', metavar='default_schema',
+ dest='schema', type=str, default='public', help='Default schema name.')
+
+ def init_logging(self):
+ SrcDstTool.init_logging(self)
+ self.runquery.log = self.log
+
+ def setup(self, args=None):
+ SrcDstTool.setup(self, args)
+ self.target_isolation_level = 'autocommit'
+ self.prepare_conns(target=self.args.src)
+
+ def main(self):
+ for table in self.args.tables:
+ self.sync(table)
+
+ def sync(self, table):
+ if '.' in table:
+ schema, table = table.split('.', 1)
+ else:
+ schema = self.args.schema
+ # Call tablediff
+ self.tablediff.load_args([self.args.src, self.args.dst,
+ '-r', '-s', schema, '-t', table, '--sql', '-o', '/tmp/diff.sql'])
+ self.tablediff.main()
+ # Call runquery
+ self.runquery.load_args([self.args.src, '--one-query-per-line',
+ '-f', '/tmp/diff.sql'])
+ self.runquery.main()
+
+
+cls = TableSyncTool
+
--- a/runquery.py Tue May 06 18:37:41 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,105 +0,0 @@
-#!/usr/bin/env python3
-"""
-runquery
-
-Execute configured queries once.
-
-"""
-
-from pgtoolkit import toolbase
-
-import logging.handlers
-import time
-from datetime import datetime, timedelta
-
-from psycopg2 import ProgrammingError
-
-
-class RunQueryTool(toolbase.ToolBase):
-
- def __init__(self):
- toolbase.ToolBase.__init__(self, name='runquery', desc='Run configured queries.')
- self.parser.add_argument('target', nargs='?', metavar='target', type=str, help='Target database')
- self.parser.add_argument('-c', dest='config', type=str, help='Additional config file (besides pgtoolkit.conf).')
- self.parser.add_argument('-q', dest='queries', metavar='QUERY', nargs='*', help='Queries to run.')
- self.parser.add_argument('-f', dest='file', metavar='FILE', help='Read query from file.')
- self.parser.add_argument('--one-query-per-line', action='store_true', help='When reading queries from file, consider each line as separate query.')
- self.parser.add_argument('-p', '--parameter', dest='parameters', metavar='PARAM=VALUE', nargs='*',
- help="If query should be used as format template, these parameters will be substituted.")
- self.parser.add_argument('--output-file', dest='output_file', metavar='OUTPUT_FILE', help='Write query result in specified file.')
- self.parser.add_argument('--format', dest='format', metavar='FORMAT', help='Format string for each line in output file (using Python\'s format()).')
-
- self.config.add_option('target', type=str, default=None)
- self.config.add_option('queries', type=list, default=[])
- self.config.add_option('log_path', type=str)
-
- self.target_isolation_level = 'autocommit'
-
- self.init()
-
- def init(self):
- toolbase.ToolBase.init(self)
- if self.args.config:
- self.config.load(self.args.config)
- self.queries = self.args.queries or self.config.queries
- # read query from file
- if self.args.file:
- with open(self.args.file, 'r', encoding='utf8') as f:
- data = f.read()
- if self.args.one_query_per_line:
- file_queries = [ln for ln in data.splitlines() if not ln.lstrip().startswith('--')]
- self.queries = file_queries + self.queries
- else:
- self.queries.insert(0, data)
- # prepare parameters
- self._prepare_parameters(self.args.parameters)
- if self.config.log_path:
- self.init_file_logs(self.config.log_path)
- self.prepare_conns(target = self.args.target or self.config.target)
-
- def init_file_logs(self, path):
- format = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s', '%y-%m-%d %H:%M:%S')
- handler = logging.handlers.TimedRotatingFileHandler(path+'/main.log', when='midnight', backupCount=5)
- handler.setFormatter(format)
- handler.setLevel(logging.DEBUG)
- logging.getLogger('main').addHandler(handler)
-
- format = logging.Formatter('%(asctime)s %(message)s', '%y-%m-%d %H:%M:%S')
- handler = logging.handlers.TimedRotatingFileHandler(path+'/pgnotices.log', when='midnight', backupCount=5)
- handler.setFormatter(format)
- handler.setLevel(logging.DEBUG)
- logging.getLogger('pgmanager_notices').addHandler(handler)
-
- def main(self):
- """Execute the queries."""
- for q in self.queries:
- if self.parameters:
- q = q.format(**self.parameters)
- self.log.info('%s', q if len(q) < 100 else q[:100]+'...')
- with self.pgm.cursor('target') as curs:
- curs.execute(q)
- self.log.info('Rows affected: %d', curs.rowcount)
- try:
- rows = curs.fetchall_dict()
- self._write_output_file(rows)
- except ProgrammingError:
- pass
- self.log.info('Done')
-
- def _write_output_file(self, rows):
- if not self.args.output_file:
- return
- with open(self.args.output_file, 'w', encoding='utf8') as f:
- for row in rows:
- print(self.args.format.format(row), file=f)
-
- def _prepare_parameters(self, parameters):
- self.parameters = {}
- for parameter in parameters or ():
- name, value = parameter.split('=', 1)
- self.parameters[name] = value
-
-
-tool = RunQueryTool()
-tool.main()
-
--- a/schemadiff.py Tue May 06 18:37:41 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,49 +0,0 @@
-#!/usr/bin/env python3
-#
-# Print differences in database schema.
-#
-# Prints changes from source to destination.
-# SQL patch updates source database schema to destination schema.
-#
-
-from pgtoolkit import pgmanager, pgbrowser, pgdiff, toolbase
-
-
-class SchemaDiffTool(toolbase.SrcDstTool):
- def __init__(self):
- toolbase.SrcDstTool.__init__(self, name='schemadiff', desc='Database schema diff.', allow_reverse = True)
-
- self.parser.add_argument('-s', dest='schema', nargs='*', help='Schema filter')
- self.parser.add_argument('-t', dest='table', nargs='*', help='Table filter')
- self.parser.add_argument('-f', dest='function', type=str, help='Function filter (regex)')
- self.parser.add_argument('--sql', action='store_true', help='Output is SQL script.')
- self.parser.add_argument('--body', action='store_true', help='Output diff for function bodies.')
-
- self.init()
-
- def main(self):
- srcbrowser = pgbrowser.PgBrowser(self.pgm.get_conn('src'))
- dstbrowser = pgbrowser.PgBrowser(self.pgm.get_conn('dst'))
-
- pgd = pgdiff.PgDiff(srcbrowser, dstbrowser)
-
- try:
- if self.args.schema:
- pgd.filter_schemas(include=self.args.schema)
- if self.args.table:
- pgd.filter_tables(include=self.args.table)
- if self.args.function:
- pgd.filter_functions(self.args.function)
- if self.args.body:
- pgd.function_body_diff = True
-
- if self.args.sql:
- pgd.print_patch()
- else:
- pgd.print_diff()
- except pgdiff.PgDiffError as e:
- print('PgDiff error:', str(e))
-
-
-tool = SchemaDiffTool()
-tool.main()
--- a/setup.py Tue May 06 18:37:41 2014 +0200
+++ b/setup.py Tue May 06 18:37:43 2014 +0200
@@ -4,14 +4,13 @@
setup(
name='pgtoolkit',
- version='0.3.1',
+ version='0.4.0',
description='Postgres utilities, build on top of psycopg2',
author='Radek Brich',
author_email='radek.brich@devl.cz',
url='http://hg.devl.cz/pgtoolkit',
keywords=['postgresql', 'psycopg2', 'pool', 'keepalive'],
- packages=['pgtoolkit', 'mytoolkit'],
- scripts=['analyzeall.py', 'bigtables.py', 'listdepends.py', 'listserial.py',
- 'longqueries.py', 'loopquery.py', 'runquery.py', 'schemadiff.py', 'tablediff.py'],
- )
+ packages=['pgtoolkit', 'pgtoolkit.tools', 'mytoolkit'],
+ scripts=['pgtool'],
+)