pgtoolkit/tools/loopquery.py
author Radek Brich <brich.radek@ifortuna.cz>
Tue, 06 May 2014 18:37:43 +0200
changeset 101 2a2d0d5df03b
parent 93 loopquery.py@b72591087495
child 102 fda45bdfd68d
permissions -rw-r--r--
Refactor ToolBase to allow tool composition. Add TableSync tool (composited). Move more tools under pgtool.

from pgtoolkit.toolbase import ToolBase

import logging.handlers
import time
from datetime import datetime, timedelta


class LoopQueryTool(ToolBase):

    """
    Execute queries in loop, with configurable interval.

    """

    def __init__(self):
        ToolBase.__init__(self, name='loopquery')
        self.target_isolation_level = 'autocommit'

    def specify_args(self):
        ToolBase.specify_args(self)
        self.parser.add_argument('target', nargs='?', metavar='target', type=str, help='Target database')
        self.parser.add_argument('-c', dest='config', type=str, help='Additional config file (besides pgtoolkit.conf).')
        self.parser.add_argument('-q', dest='queries', metavar='QUERY', nargs='*', help='Queries to run.')
        self.parser.add_argument('--mins', dest='delay_mins', type=int, help='Delay between queries in minutes.')
        self.parser.add_argument('--secs', dest='delay_secs', type=int, help='Delay between queries in seconds.')

        self.config.add_option('target', type=str, default=None)
        self.config.add_option('queries', type=list, default=[])
        self.config.add_option('delay_mins', type=int, default=0)
        self.config.add_option('delay_secs', type=int, default=0)
        self.config.add_option('log_path', type=str)

    def load_args(self, args=None, config_file=None):
        ToolBase.load_args(self, args, config_file)
        if self.args.config:
            self.config.load(self.args.config)
        self.queries = self.args.queries or self.config.queries
        self.delay_mins = self.args.delay_mins or self.config.delay_mins
        self.delay_secs = self.args.delay_secs or self.config.delay_secs

    def init_logging(self):
        ToolBase.init_logging(self)
        if self.config.log_path:
            self.init_file_logs(self.config.log_path)

    def setup(self, args=None):
        ToolBase.setup(self, args)
        self.prepare_conns(target=self.args.target or self.config.target)

    def init_file_logs(self, path):
        format = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s', '%y-%m-%d %H:%M:%S')
        handler = logging.handlers.TimedRotatingFileHandler(path+'/main.log', when='midnight', backupCount=5)
        handler.setFormatter(format)
        handler.setLevel(logging.DEBUG)
        logging.getLogger('main').addHandler(handler)

        format = logging.Formatter('%(asctime)s %(message)s', '%y-%m-%d %H:%M:%S')
        handler = logging.handlers.TimedRotatingFileHandler(path+'/pgnotices.log', when='midnight', backupCount=5)
        handler.setFormatter(format)
        handler.setLevel(logging.DEBUG)
        logging.getLogger('pgmanager_notices').addHandler(handler)

    def main(self):
        self.reset()
        while True:
            self.wait()
            self.action()

    def reset(self):
        """Check current time, set next action time."""
        dt = datetime.today()
        dt = dt.replace(microsecond = 0)
        self.next_action_time = dt + timedelta(minutes = self.delay_mins,
                                               seconds = self.delay_secs)

    def wait(self):
        """Wait for action time, compute next action time."""
        now = datetime.today()
        self.log.debug('Next run %s', self.next_action_time)
        if self.next_action_time > now:
            td = self.next_action_time - now
            self.log.debug('Sleep %ds', td.seconds + td.microseconds/1e6)
            time.sleep(td.seconds + td.microseconds/1e6)
        self.next_action_time += timedelta(minutes = self.delay_mins,
                                           seconds = self.delay_secs)
        # in case that action took too long and next planned time would
        # be in past -> reset planner
        if self.next_action_time < now:
            self.reset()

    def action(self):
        """Execute the queries."""
        for q in self.queries:
            self.log.info('%s', q)
            with self.pgm.cursor('target') as curs:
                curs.execute(q)
        self.log.info('Done')


cls = LoopQueryTool