# HG changeset patch # User Radek Brich # Date 1337764276 -7200 # Node ID e67101c22e83348a4536a1eab0ba679599d9cdd2 # Parent e7f79c4a27ce6945136cab53098ac65886a0da82 pgmanager: Add create_conn_listen() which should be used with wait_for_notify. Update wait_for_notify() to not use put_conn(). Add name to ConnectionInfo. Log queries before they are called. Log exceptions. Add notifyexample. diff -r e7f79c4a27ce -r e67101c22e83 notifyexample.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/notifyexample.py Wed May 23 11:11:16 2012 +0200 @@ -0,0 +1,25 @@ +#!/usr/bin/env python3.2 +# +# Call "NOTIFY notifyexample" on target DB to wake up this program. +# + +from pgtoolkit import toolbase, pgmanager + + +class NotifyExample(toolbase.SimpleTool): + def __init__(self): + toolbase.SimpleTool.__init__(self, name='notifyexample', desc='Sample program for listen/notify.') + self.init() + + def main(self): + # create another connection for notifies, copy parameters from connection 'target' + self.pgm.create_conn_listen('target_listen', channel='notifyexample', copy_dsn='target') + while True: + ev = self.pgm.wait_for_notify('target_listen', timeout=None) + if ev: + print(ev) + + +tool = NotifyExample() +tool.main() + diff -r e7f79c4a27ce -r e67101c22e83 pgtoolkit/pgmanager.py --- a/pgtoolkit/pgmanager.py Fri May 11 14:16:36 2012 +0200 +++ b/pgtoolkit/pgmanager.py Wed May 23 11:11:16 2012 +0200 @@ -7,7 +7,7 @@ # Part of pgtoolkit # http://hg.devl.cz/pgtoolkit # -# Copyright (c) 2010, 2011 Radek Brich +# Copyright (c) 2010, 2011, 2012 Radek Brich # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -63,7 +63,7 @@ The with statement obtains connection (actually connects to database when needed), then returns cursor for this connection. At the end of with statement, the connection is returned to the pool or closed (depending on number of connections -in pool and on setting of keep_open parameter). +in pool and on setting of pool_size parameter). The row returned by fetchone_dict() is special dict object, which can be accessed using item or attribute access, that is row['now'] or row.now. @@ -94,14 +94,14 @@ class ConnectionInfo: - def __init__(self, dsn, isolation_level=None, keep_alive=True, - init_statement=None, keep_open=1): - + def __init__(self, name, dsn, isolation_level=None, keep_alive=True, + init_statement=None, pool_size=1): + self.name = name # connection name is logged with SQL queries self.dsn = dsn self.isolation_level = isolation_level self.keep_alive = keep_alive self.init_statement = init_statement - self.keep_open = keep_open + self.pool_size = pool_size class RowDict(OrderedDict): @@ -116,16 +116,18 @@ class Cursor(psycopg2.extensions.cursor): def execute(self, query, args=None): + self._log_query(query, args) try: return super(Cursor, self).execute(query, args) - finally: - self._log_query() + except DatabaseError: + self._log_exception() def callproc(self, procname, args=None): + self._log_query('CALL %s(%s)' % (procname, ','.join(args))) try: return super(Cursor, self).callproc(procname, args) - finally: - self._log_query() + except DatabaseError: + self._log_exception() def row_dict(self, row, lstrip=None): adjustname = lambda a: a @@ -162,10 +164,13 @@ rows = super(Cursor, self).fetchall() return [self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip) for row in rows] - def _log_query(self): - if self.query: - name = self.connection.name if hasattr(self.connection, 'name') else '-' - log_sql.info('[%s] %s' % (name, self.query.decode('utf8'))) + def _log_query(self, query, args=None): + name = self.connection.name if hasattr(self.connection, 'name') else '-' + log_sql.info('[%s] %s' % (name, self.mogrify(query, args).decode('utf8'))) + + def _log_exception(self): + name = self.connection.name if hasattr(self.connection, 'name') else '-' + log_sql.exception('[%s] exception:' % (name,)) class Connection(psycopg2.extensions.connection): @@ -202,12 +207,14 @@ for conn in tuple(self.conn_known.keys()): self.destroy_conn(conn) - def create_conn(self, name='default', keep_open=1, isolation_level=None, keep_alive=True, dsn=None, **kw): + def create_conn(self, name='default', isolation_level=None, keep_alive=True, init_statement=None, + pool_size=1, dsn=None, **kwargs): '''Create named connection. name -- name for connection (default is "default") - keep_open -- how many connections will be kept open in pool (more connections will still be created, - but they will be closed by put_conn) + pool_size -- how many connections will be kept open in pool + (more connections will still be created but they will be closed by put_conn) + None - disable pool, always return same connection isolation_level -- "autocommit", "read_committed", "serializable" or None for driver default keep_alive -- set socket to keepalive mode dsn -- string with connection parameters (dsn means Data Source Name) @@ -219,13 +226,36 @@ raise PgManagerError('Connection name "%s" already registered.' % name) if dsn is None: - dsn = ' '.join([x[0]+'='+str(x[1]) for x in kw.items() if x[1] is not None]) + dsn = ' '.join([x[0]+'='+str(x[1]) for x in kwargs.items() if x[1] is not None]) isolation_level = self._normalize_isolation_level(isolation_level) - ci = ConnectionInfo(dsn, isolation_level, keep_alive, keep_open=keep_open) + ci = ConnectionInfo(name, dsn, isolation_level, keep_alive, init_statement, pool_size) self.conn_known[name] = ci self.conn_pool[name] = [] + + def create_conn_listen(self, name, channel, dsn=None, copy_dsn=None, **kwargs): + '''Create connection listening for notifies. + + Disables pool. If you want to use pool, create other connection for that. + This connection can be used as usual: conn.cursor() etc. + Don't use PgManager's cursor() and put_conn(). + + name -- name for connection + channel -- listen on this channel + copy_dsn -- specify name of other connection and its dsn will be used + + Other parameters forwarded to create_conn(). + + ''' + if dsn is None and copy_dsn: + try: + dsn = self.conn_known[copy_dsn].dsn + except KeyError: + raise PgManagerError("Connection name '%s' not registered." % copy_dsn) + listen_query = "LISTEN " + channel + self.create_conn(name=name, pool_size=None, isolation_level='autocommit', init_statement=listen_query, + dsn=dsn, **kwargs) def close_conn(self, name='default'): '''Close all connections of given name. @@ -256,20 +286,36 @@ self._check_fork() self.lock.acquire() try: - if not name in self.conn_known: + try: + ci = self.conn_known[name] + except KeyError: raise PgManagerError("Connection name '%s' not registered." % name) - - conn = None - while len(self.conn_pool[name]) and conn is None: - conn = self.conn_pool[name].pop() - if conn.closed: + + # no pool, just one static connection + if ci.pool_size is None: + # check for existing connection + try: + conn = self.conn_pool[name][0] + if conn.closed: + conn = None + except IndexError: conn = None - - if conn is None: - ci = self.conn_known[name] - conn = self._connect(ci) - # add our name to connection instance (this is then logged with SQL queries) - conn.name = name + self.conn_pool[name].append(conn) + # if no existing connection is valid, connect new one and save it + if conn is None: + conn = self._connect(ci) + self.conn_pool[name][0] = conn + + # connection from pool + else: + conn = None + while len(self.conn_pool[name]) and conn is None: + conn = self.conn_pool[name].pop() + if conn.closed: + conn = None + + if conn is None: + conn = self._connect(ci) finally: self.lock.release() return conn @@ -286,7 +332,7 @@ if not name in self.conn_known: raise PgManagerError("Connection name '%s' not registered." % name) - if len(self.conn_pool[name]) >= self.conn_known[name].keep_open: + if len(self.conn_pool[name]) >= self.conn_known[name].pool_size: conn.close() return @@ -330,38 +376,35 @@ log_notices.info(notice.rstrip()) conn.notices[:] = [] - def wait_for_notify(self, name='default', timeout=5): + def wait_for_notify(self, name='default', timeout=5.0): '''Wait for asynchronous notifies, return the last one. + name -- name of connection, must be created using create_conn_listen() + timeout -- in seconds, floating point (None - wait forever) + Returns None on timeout. ''' conn = self.get_conn(name) - try: - # any residual notify? - # then return it, that should not break anything + # return any notifies on stack + if conn.notifies: + return conn.notifies.pop() + + if select.select([conn], [], [], timeout) == ([], [], []): + # timeout + return None + else: + conn.poll() + + # return just the last notify (we do not care for older ones) if conn.notifies: return conn.notifies.pop() - - if select.select([conn], [], [], timeout) == ([], [], []): - # timeout - return None - else: - conn.poll() - - # return just the last notify (we do not care for older ones) - if conn.notifies: - return conn.notifies.pop() - return None - finally: - # clean notifies - while conn.notifies: - conn.notifies.pop() - self.put_conn(conn, name) + return None def _connect(self, ci): conn = psycopg2.connect(ci.dsn, connection_factory=Connection) + conn.name = ci.name if ci.keep_alive: conn.keep_alive() if not ci.isolation_level is None: