Refactor ToolBase to allow tool composition. Add TableSync tool (composited). Move more tools under pgtool.
from pgtoolkit.toolbase import ToolBase
import logging.handlers
import time
from datetime import datetime, timedelta
from psycopg2 import ProgrammingError
class RunQueryTool(ToolBase):
"""
Execute configured queries in target database.
"""
def __init__(self):
ToolBase.__init__(self, name='runquery')
self.target_isolation_level = 'autocommit'
def specify_args(self):
ToolBase.specify_args(self)
self.parser.add_argument('target', nargs='?', metavar='target', type=str, help='Target database')
self.parser.add_argument('-c', dest='config', type=str, help='Additional config file (besides pgtoolkit.conf).')
self.parser.add_argument('-q', dest='queries', metavar='QUERY', nargs='*', help='Queries to run.')
self.parser.add_argument('-f', dest='file', metavar='FILE', help='Read query from file.')
self.parser.add_argument('--one-query-per-line', action='store_true', help='When reading queries from file, consider each line as separate query.')
self.parser.add_argument('-p', '--parameter', dest='parameters', metavar='PARAM=VALUE', nargs='*',
help="If query should be used as format template, these parameters will be substituted.")
self.parser.add_argument('--output-file', dest='output_file', metavar='OUTPUT_FILE', help='Write query result in specified file.')
self.parser.add_argument('--format', dest='format', metavar='FORMAT', help='Format string for each line in output file (using Python\'s format()).')
self.config.add_option('target', type=str, default=None)
self.config.add_option('queries', type=list, default=[])
self.config.add_option('log_path', type=str)
def setup(self, args=None):
ToolBase.setup(self, args)
self.prepare_conns(target=self.args.target or self.config.target)
def load_args(self, args=None, config_file=None):
ToolBase.load_args(self, args, config_file)
if self.args.config:
self.config.load(self.args.config)
self.queries = self.args.queries or self.config.queries
# read query from file
if self.args.file:
with open(self.args.file, 'r', encoding='utf8') as f:
data = f.read()
if self.args.one_query_per_line:
file_queries = [ln for ln in data.splitlines() if not ln.lstrip().startswith('--')]
self.queries = file_queries + self.queries
else:
self.queries.insert(0, data)
# prepare parameters
self._prepare_parameters(self.args.parameters)
def init_logging(self):
ToolBase.init_logging(self)
if self.config.log_path:
self.init_file_logs(self.config.log_path)
def init_file_logs(self, path):
format = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s', '%y-%m-%d %H:%M:%S')
handler = logging.handlers.TimedRotatingFileHandler(path+'/main.log', when='midnight', backupCount=5)
handler.setFormatter(format)
handler.setLevel(logging.DEBUG)
logging.getLogger('main').addHandler(handler)
format = logging.Formatter('%(asctime)s %(message)s', '%y-%m-%d %H:%M:%S')
handler = logging.handlers.TimedRotatingFileHandler(path+'/pgnotices.log', when='midnight', backupCount=5)
handler.setFormatter(format)
handler.setLevel(logging.DEBUG)
logging.getLogger('pgmanager_notices').addHandler(handler)
def main(self):
"""Execute the queries."""
print(self.queries)
for q in self.queries:
if self.parameters:
q = q.format(**self.parameters)
self.log.info('%s', q if len(q) < 100 else q[:100]+'...')
with self.pgm.cursor('target') as curs:
curs.execute(q)
self.log.info('Rows affected: %d', curs.rowcount)
try:
rows = curs.fetchall_dict()
self._write_output_file(rows)
except ProgrammingError:
pass
self.log.info('Done')
def _write_output_file(self, rows):
if not self.args.output_file:
return
with open(self.args.output_file, 'w', encoding='utf8') as f:
for row in rows:
print(self.args.format.format(row), file=f)
def _prepare_parameters(self, parameters):
self.parameters = {}
for parameter in parameters or ():
name, value = parameter.split('=', 1)
self.parameters[name] = value
cls = RunQueryTool