Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).
#!/usr/bin/env python3
"""
loopquery
Execute some queries in loop. Execute interval can be set.
"""
from pgtoolkit import toolbase
import logging.handlers
import time
from datetime import datetime, timedelta
class LoopQueryTool(toolbase.ToolBase):
def __init__(self):
toolbase.ToolBase.__init__(self, name='loopquery', desc='Run query in loop.')
self.parser.add_argument('target', nargs='?', metavar='target', type=str, help='Target database')
self.parser.add_argument('-c', dest='config', type=str, help='Additional config file (besides pgtoolkit.conf).')
self.parser.add_argument('-q', dest='queries', metavar='QUERY', nargs='*', help='Queries to run.')
self.parser.add_argument('--mins', dest='delay_mins', type=int, help='Delay between queries in minutes.')
self.parser.add_argument('--secs', dest='delay_secs', type=int, help='Delay between queries in seconds.')
self.config.add_option('target', type=str, default=None)
self.config.add_option('queries', type=list, default=[])
self.config.add_option('delay_mins', type=int, default=0)
self.config.add_option('delay_secs', type=int, default=0)
self.config.add_option('log_path', type=str)
self.target_isolation_level = 'autocommit'
self.init()
def init(self):
toolbase.ToolBase.init(self)
if self.args.config:
self.config.load(self.args.config)
self.queries = self.args.queries or self.config.queries
self.delay_mins = self.args.delay_mins or self.config.delay_mins
self.delay_secs = self.args.delay_secs or self.config.delay_secs
if self.config.log_path:
self.init_file_logs(self.config.log_path)
self.prepare_conns(target = self.args.target or self.config.target)
def init_file_logs(self, path):
format = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s', '%y-%m-%d %H:%M:%S')
handler = logging.handlers.TimedRotatingFileHandler(path+'/main.log', when='midnight', backupCount=5)
handler.setFormatter(format)
handler.setLevel(logging.DEBUG)
logging.getLogger('main').addHandler(handler)
format = logging.Formatter('%(asctime)s %(message)s', '%y-%m-%d %H:%M:%S')
handler = logging.handlers.TimedRotatingFileHandler(path+'/pgnotices.log', when='midnight', backupCount=5)
handler.setFormatter(format)
handler.setLevel(logging.DEBUG)
logging.getLogger('pgmanager_notices').addHandler(handler)
def main(self):
self.reset()
while True:
self.wait()
self.action()
def reset(self):
"""Check current time, set next action time."""
dt = datetime.today()
dt = dt.replace(microsecond = 0)
self.next_action_time = dt + timedelta(minutes = self.delay_mins,
seconds = self.delay_secs)
def wait(self):
"""Wait for action time, compute next action time."""
now = datetime.today()
self.log.debug('Next run %s', self.next_action_time)
if self.next_action_time > now:
td = self.next_action_time - now
self.log.debug('Sleep %ds', td.seconds + td.microseconds/1e6)
time.sleep(td.seconds + td.microseconds/1e6)
self.next_action_time += timedelta(minutes = self.delay_mins,
seconds = self.delay_secs)
# in case that action took too long and next planned time would
# be in past -> reset planner
if self.next_action_time < now:
self.reset()
def action(self):
"""Execute the queries."""
for q in self.queries:
self.log.info('%s', q)
with self.pgm.cursor('target') as curs:
curs.execute(q)
self.log.info('Done')
tool = LoopQueryTool()
tool.main()