PgManager: log connection name with queries. BatchUpdateTool: handle some possible exceptions and try reconnect to database.
authorRadek Brich <radek.brich@devl.cz>
Wed, 28 Mar 2012 17:25:18 +0200
changeset 33 bd0beda49bcb
parent 32 d59c473c9ad7
child 34 98c7809af415
PgManager: log connection name with queries. BatchUpdateTool: handle some possible exceptions and try reconnect to database.
batchupdate.py
pgtoolkit/pgmanager.py
--- a/batchupdate.py	Fri Mar 23 14:54:04 2012 +0100
+++ b/batchupdate.py	Wed Mar 28 17:25:18 2012 +0200
@@ -4,6 +4,7 @@
 
 from pgtoolkit import toolbase
 from pgtoolkit.highlight import highlight
+from pgtoolkit.pgmanager import OperationalError
 
 
 class BatchUpdateTool(toolbase.SimpleTool):
@@ -15,16 +16,19 @@
 
     def main(self):
         # connect DB
-        with self.pgm.cursor('target') as curs:
-            rowcount = 1
-            while rowcount > 0:
-                print('query:', self.args.query)
-                curs.execute(self.args.query, [])
-                rowcount = curs.rowcount
-                print('updated', rowcount)
-                curs.connection.commit()
-                print('sleep %s seconds' % self.args.sleep)
-                time.sleep(self.args.sleep)
+        rowcount = 1
+        while rowcount > 0:
+            print('query:', self.args.query)
+            with self.pgm.cursor('target') as curs:
+                try:
+                    curs.execute(self.args.query, [])
+                    rowcount = curs.rowcount
+                    print('updated', rowcount)
+                    curs.connection.commit()
+                except (OperationalError, SystemError) as e:
+                    print('Error:', str(e))
+            print('sleep %s seconds' % self.args.sleep)
+            time.sleep(self.args.sleep)
 
 
 tool = BatchUpdateTool()
--- a/pgtoolkit/pgmanager.py	Fri Mar 23 14:54:04 2012 +0100
+++ b/pgtoolkit/pgmanager.py	Wed Mar 28 17:25:18 2012 +0200
@@ -119,15 +119,13 @@
         try:
             return super(Cursor, self).execute(query, args)
         finally:
-            if self.query:
-                log_sql.info(self.query.decode('utf8'))
+            self._log_query()
 
     def callproc(self, procname, args=None):
         try:
             return super(Cursor, self).callproc(procname, args)
         finally:
-            if self.query:
-                log_sql.info(self.query.decode('utf8'))
+            self._log_query()
 
     def row_dict(self, row, lstrip=None):
         adjustname = lambda a: a
@@ -164,6 +162,10 @@
         rows = super(Cursor, self).fetchall()
         return [self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip) for row in rows]
 
+    def _log_query(self):
+        if self.query:
+            name = self.connection.name if hasattr(self.connection, 'name') else '-'
+            log_sql.info('[%s] %s' % (name, self.query.decode('utf8')))
 
 class Connection(psycopg2.extensions.connection):
 
@@ -266,6 +268,8 @@
             if conn is None:
                 ci = self.conn_known[name]
                 conn = self._connect(ci)
+                # add our name to connection instance (this is then logged with SQL queries)
+                conn.name = name
         finally:
             self.lock.release()
         return conn
@@ -292,7 +296,12 @@
 
             # connection returned to the pool must not be in transaction
             if conn.get_transaction_status() != psycopg2.extensions.TRANSACTION_STATUS_IDLE:
-                conn.rollback()
+                try:
+                    conn.rollback()
+                except OperationalError:
+                    if not conn.closed:
+                        conn.close()
+                    return
 
             self.conn_pool[name].append(conn)
         finally: