|
1 from pgtoolkit.toolbase import ToolBase |
|
2 |
|
3 import logging.handlers |
|
4 import time |
|
5 from datetime import datetime, timedelta |
|
6 |
|
7 |
|
8 class LoopQueryTool(ToolBase): |
|
9 |
|
10 """ |
|
11 Execute queries in loop, with configurable interval. |
|
12 |
|
13 """ |
|
14 |
|
15 def __init__(self): |
|
16 ToolBase.__init__(self, name='loopquery') |
|
17 self.target_isolation_level = 'autocommit' |
|
18 |
|
19 def specify_args(self): |
|
20 ToolBase.specify_args(self) |
|
21 self.parser.add_argument('target', nargs='?', metavar='target', type=str, help='Target database') |
|
22 self.parser.add_argument('-c', dest='config', type=str, help='Additional config file (besides pgtoolkit.conf).') |
|
23 self.parser.add_argument('-q', dest='queries', metavar='QUERY', nargs='*', help='Queries to run.') |
|
24 self.parser.add_argument('--mins', dest='delay_mins', type=int, help='Delay between queries in minutes.') |
|
25 self.parser.add_argument('--secs', dest='delay_secs', type=int, help='Delay between queries in seconds.') |
|
26 |
|
27 self.config.add_option('target', type=str, default=None) |
|
28 self.config.add_option('queries', type=list, default=[]) |
|
29 self.config.add_option('delay_mins', type=int, default=0) |
|
30 self.config.add_option('delay_secs', type=int, default=0) |
|
31 self.config.add_option('log_path', type=str) |
|
32 |
|
33 def load_args(self, args=None, config_file=None): |
|
34 ToolBase.load_args(self, args, config_file) |
|
35 if self.args.config: |
|
36 self.config.load(self.args.config) |
|
37 self.queries = self.args.queries or self.config.queries |
|
38 self.delay_mins = self.args.delay_mins or self.config.delay_mins |
|
39 self.delay_secs = self.args.delay_secs or self.config.delay_secs |
|
40 |
|
41 def init_logging(self): |
|
42 ToolBase.init_logging(self) |
|
43 if self.config.log_path: |
|
44 self.init_file_logs(self.config.log_path) |
|
45 |
|
46 def setup(self, args=None): |
|
47 ToolBase.setup(self, args) |
|
48 self.prepare_conns(target=self.args.target or self.config.target) |
|
49 |
|
50 def init_file_logs(self, path): |
|
51 format = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s', '%y-%m-%d %H:%M:%S') |
|
52 handler = logging.handlers.TimedRotatingFileHandler(path+'/main.log', when='midnight', backupCount=5) |
|
53 handler.setFormatter(format) |
|
54 handler.setLevel(logging.DEBUG) |
|
55 logging.getLogger('main').addHandler(handler) |
|
56 |
|
57 format = logging.Formatter('%(asctime)s %(message)s', '%y-%m-%d %H:%M:%S') |
|
58 handler = logging.handlers.TimedRotatingFileHandler(path+'/pgnotices.log', when='midnight', backupCount=5) |
|
59 handler.setFormatter(format) |
|
60 handler.setLevel(logging.DEBUG) |
|
61 logging.getLogger('pgmanager_notices').addHandler(handler) |
|
62 |
|
63 def main(self): |
|
64 self.reset() |
|
65 while True: |
|
66 self.wait() |
|
67 self.action() |
|
68 |
|
69 def reset(self): |
|
70 """Check current time, set next action time.""" |
|
71 dt = datetime.today() |
|
72 dt = dt.replace(microsecond = 0) |
|
73 self.next_action_time = dt + timedelta(minutes = self.delay_mins, |
|
74 seconds = self.delay_secs) |
|
75 |
|
76 def wait(self): |
|
77 """Wait for action time, compute next action time.""" |
|
78 now = datetime.today() |
|
79 self.log.debug('Next run %s', self.next_action_time) |
|
80 if self.next_action_time > now: |
|
81 td = self.next_action_time - now |
|
82 self.log.debug('Sleep %ds', td.seconds + td.microseconds/1e6) |
|
83 time.sleep(td.seconds + td.microseconds/1e6) |
|
84 self.next_action_time += timedelta(minutes = self.delay_mins, |
|
85 seconds = self.delay_secs) |
|
86 # in case that action took too long and next planned time would |
|
87 # be in past -> reset planner |
|
88 if self.next_action_time < now: |
|
89 self.reset() |
|
90 |
|
91 def action(self): |
|
92 """Execute the queries.""" |
|
93 for q in self.queries: |
|
94 self.log.info('%s', q) |
|
95 with self.pgm.cursor('target') as curs: |
|
96 curs.execute(q) |
|
97 self.log.info('Done') |
|
98 |
|
99 |
|
100 cls = LoopQueryTool |
|
101 |