PgManager: log connection name with queries. BatchUpdateTool: handle some possible exceptions and try reconnect to database.
--- a/batchupdate.py Fri Mar 23 14:54:04 2012 +0100
+++ b/batchupdate.py Wed Mar 28 17:25:18 2012 +0200
@@ -4,6 +4,7 @@
from pgtoolkit import toolbase
from pgtoolkit.highlight import highlight
+from pgtoolkit.pgmanager import OperationalError
class BatchUpdateTool(toolbase.SimpleTool):
@@ -15,16 +16,19 @@
def main(self):
# connect DB
- with self.pgm.cursor('target') as curs:
- rowcount = 1
- while rowcount > 0:
- print('query:', self.args.query)
- curs.execute(self.args.query, [])
- rowcount = curs.rowcount
- print('updated', rowcount)
- curs.connection.commit()
- print('sleep %s seconds' % self.args.sleep)
- time.sleep(self.args.sleep)
+ rowcount = 1
+ while rowcount > 0:
+ print('query:', self.args.query)
+ with self.pgm.cursor('target') as curs:
+ try:
+ curs.execute(self.args.query, [])
+ rowcount = curs.rowcount
+ print('updated', rowcount)
+ curs.connection.commit()
+ except (OperationalError, SystemError) as e:
+ print('Error:', str(e))
+ print('sleep %s seconds' % self.args.sleep)
+ time.sleep(self.args.sleep)
tool = BatchUpdateTool()
--- a/pgtoolkit/pgmanager.py Fri Mar 23 14:54:04 2012 +0100
+++ b/pgtoolkit/pgmanager.py Wed Mar 28 17:25:18 2012 +0200
@@ -119,15 +119,13 @@
try:
return super(Cursor, self).execute(query, args)
finally:
- if self.query:
- log_sql.info(self.query.decode('utf8'))
+ self._log_query()
def callproc(self, procname, args=None):
try:
return super(Cursor, self).callproc(procname, args)
finally:
- if self.query:
- log_sql.info(self.query.decode('utf8'))
+ self._log_query()
def row_dict(self, row, lstrip=None):
adjustname = lambda a: a
@@ -164,6 +162,10 @@
rows = super(Cursor, self).fetchall()
return [self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip) for row in rows]
+ def _log_query(self):
+ if self.query:
+ name = self.connection.name if hasattr(self.connection, 'name') else '-'
+ log_sql.info('[%s] %s' % (name, self.query.decode('utf8')))
class Connection(psycopg2.extensions.connection):
@@ -266,6 +268,8 @@
if conn is None:
ci = self.conn_known[name]
conn = self._connect(ci)
+ # add our name to connection instance (this is then logged with SQL queries)
+ conn.name = name
finally:
self.lock.release()
return conn
@@ -292,7 +296,12 @@
# connection returned to the pool must not be in transaction
if conn.get_transaction_status() != psycopg2.extensions.TRANSACTION_STATUS_IDLE:
- conn.rollback()
+ try:
+ conn.rollback()
+ except OperationalError:
+ if not conn.closed:
+ conn.close()
+ return
self.conn_pool[name].append(conn)
finally: