# HG changeset patch # User Radek Brich # Date 1332948318 -7200 # Node ID bd0beda49bcbb6851d2dbad84506e9a05607a5c2 # Parent d59c473c9ad722edcb4f162d785b69ad1e25f76c PgManager: log connection name with queries. BatchUpdateTool: handle some possible exceptions and try reconnect to database. diff -r d59c473c9ad7 -r bd0beda49bcb batchupdate.py --- a/batchupdate.py Fri Mar 23 14:54:04 2012 +0100 +++ b/batchupdate.py Wed Mar 28 17:25:18 2012 +0200 @@ -4,6 +4,7 @@ from pgtoolkit import toolbase from pgtoolkit.highlight import highlight +from pgtoolkit.pgmanager import OperationalError class BatchUpdateTool(toolbase.SimpleTool): @@ -15,16 +16,19 @@ def main(self): # connect DB - with self.pgm.cursor('target') as curs: - rowcount = 1 - while rowcount > 0: - print('query:', self.args.query) - curs.execute(self.args.query, []) - rowcount = curs.rowcount - print('updated', rowcount) - curs.connection.commit() - print('sleep %s seconds' % self.args.sleep) - time.sleep(self.args.sleep) + rowcount = 1 + while rowcount > 0: + print('query:', self.args.query) + with self.pgm.cursor('target') as curs: + try: + curs.execute(self.args.query, []) + rowcount = curs.rowcount + print('updated', rowcount) + curs.connection.commit() + except (OperationalError, SystemError) as e: + print('Error:', str(e)) + print('sleep %s seconds' % self.args.sleep) + time.sleep(self.args.sleep) tool = BatchUpdateTool() diff -r d59c473c9ad7 -r bd0beda49bcb pgtoolkit/pgmanager.py --- a/pgtoolkit/pgmanager.py Fri Mar 23 14:54:04 2012 +0100 +++ b/pgtoolkit/pgmanager.py Wed Mar 28 17:25:18 2012 +0200 @@ -119,15 +119,13 @@ try: return super(Cursor, self).execute(query, args) finally: - if self.query: - log_sql.info(self.query.decode('utf8')) + self._log_query() def callproc(self, procname, args=None): try: return super(Cursor, self).callproc(procname, args) finally: - if self.query: - log_sql.info(self.query.decode('utf8')) + self._log_query() def row_dict(self, row, lstrip=None): adjustname = lambda a: a @@ -164,6 +162,10 @@ rows = super(Cursor, self).fetchall() return [self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip) for row in rows] + def _log_query(self): + if self.query: + name = self.connection.name if hasattr(self.connection, 'name') else '-' + log_sql.info('[%s] %s' % (name, self.query.decode('utf8'))) class Connection(psycopg2.extensions.connection): @@ -266,6 +268,8 @@ if conn is None: ci = self.conn_known[name] conn = self._connect(ci) + # add our name to connection instance (this is then logged with SQL queries) + conn.name = name finally: self.lock.release() return conn @@ -292,7 +296,12 @@ # connection returned to the pool must not be in transaction if conn.get_transaction_status() != psycopg2.extensions.TRANSACTION_STATUS_IDLE: - conn.rollback() + try: + conn.rollback() + except OperationalError: + if not conn.closed: + conn.close() + return self.conn_pool[name].append(conn) finally: