pgtoolkit/pgmanager.py
changeset 36 e67101c22e83
parent 33 bd0beda49bcb
child 37 5b0eb4b11940
--- a/pgtoolkit/pgmanager.py	Fri May 11 14:16:36 2012 +0200
+++ b/pgtoolkit/pgmanager.py	Wed May 23 11:11:16 2012 +0200
@@ -7,7 +7,7 @@
 # Part of pgtoolkit
 # http://hg.devl.cz/pgtoolkit
 #
-# Copyright (c) 2010, 2011  Radek Brich <radek.brich@devl.cz>
+# Copyright (c) 2010, 2011, 2012  Radek Brich <radek.brich@devl.cz>
 #
 # Permission is hereby granted, free of charge, to any person obtaining a copy
 # of this software and associated documentation files (the "Software"), to deal
@@ -63,7 +63,7 @@
 The with statement obtains connection (actually connects to database when needed),
 then returns cursor for this connection. At the end of with statement,
 the connection is returned to the pool or closed (depending on number of connections
-in pool and on setting of keep_open parameter).
+in pool and on setting of pool_size parameter).
 
 The row returned by fetchone_dict() is special dict object, which can be accessed
 using item or attribute access, that is row['now'] or row.now.
@@ -94,14 +94,14 @@
 
 class ConnectionInfo:
 
-    def __init__(self, dsn, isolation_level=None, keep_alive=True,
-        init_statement=None, keep_open=1):
-        
+    def __init__(self, name, dsn, isolation_level=None, keep_alive=True,
+                 init_statement=None, pool_size=1):
+        self.name = name  # connection name is logged with SQL queries
         self.dsn = dsn
         self.isolation_level = isolation_level
         self.keep_alive = keep_alive
         self.init_statement = init_statement
-        self.keep_open = keep_open
+        self.pool_size = pool_size
 
 
 class RowDict(OrderedDict):
@@ -116,16 +116,18 @@
 class Cursor(psycopg2.extensions.cursor):
 
     def execute(self, query, args=None):
+        self._log_query(query, args)
         try:
             return super(Cursor, self).execute(query, args)
-        finally:
-            self._log_query()
+        except DatabaseError:
+            self._log_exception()
 
     def callproc(self, procname, args=None):
+        self._log_query('CALL %s(%s)' % (procname, ','.join(args)))
         try:
             return super(Cursor, self).callproc(procname, args)
-        finally:
-            self._log_query()
+        except DatabaseError:
+            self._log_exception()
 
     def row_dict(self, row, lstrip=None):
         adjustname = lambda a: a
@@ -162,10 +164,13 @@
         rows = super(Cursor, self).fetchall()
         return [self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip) for row in rows]
 
-    def _log_query(self):
-        if self.query:
-            name = self.connection.name if hasattr(self.connection, 'name') else '-'
-            log_sql.info('[%s] %s' % (name, self.query.decode('utf8')))
+    def _log_query(self, query, args=None):
+        name = self.connection.name if hasattr(self.connection, 'name') else '-'
+        log_sql.info('[%s] %s' % (name, self.mogrify(query, args).decode('utf8')))
+
+    def _log_exception(self):
+        name = self.connection.name if hasattr(self.connection, 'name') else '-'
+        log_sql.exception('[%s] exception:' % (name,))
 
 class Connection(psycopg2.extensions.connection):
 
@@ -202,12 +207,14 @@
         for conn in tuple(self.conn_known.keys()):
             self.destroy_conn(conn)
 
-    def create_conn(self, name='default', keep_open=1, isolation_level=None, keep_alive=True, dsn=None, **kw):
+    def create_conn(self, name='default', isolation_level=None, keep_alive=True, init_statement=None,
+                    pool_size=1, dsn=None, **kwargs):
         '''Create named connection.
         
         name -- name for connection (default is "default")
-        keep_open -- how many connections will be kept open in pool (more connections will still be created,
-                     but they will be closed by put_conn)
+        pool_size -- how many connections will be kept open in pool
+                     (more connections will still be created but they will be closed by put_conn)
+                     None - disable pool, always return same connection
         isolation_level -- "autocommit", "read_committed", "serializable" or None for driver default
         keep_alive -- set socket to keepalive mode
         dsn -- string with connection parameters (dsn means Data Source Name)
@@ -219,13 +226,36 @@
             raise PgManagerError('Connection name "%s" already registered.' % name)
 
         if dsn is None:
-            dsn = ' '.join([x[0]+'='+str(x[1]) for x in kw.items() if x[1] is not None])
+            dsn = ' '.join([x[0]+'='+str(x[1]) for x in kwargs.items() if x[1] is not None])
 
         isolation_level = self._normalize_isolation_level(isolation_level)
-        ci = ConnectionInfo(dsn, isolation_level, keep_alive, keep_open=keep_open)
+        ci = ConnectionInfo(name, dsn, isolation_level, keep_alive, init_statement, pool_size)
 
         self.conn_known[name] = ci
         self.conn_pool[name] = []
+    
+    def create_conn_listen(self, name, channel, dsn=None, copy_dsn=None, **kwargs):
+        '''Create connection listening for notifies.
+        
+        Disables pool. If you want to use pool, create other connection for that.
+        This connection can be used as usual: conn.cursor() etc.
+        Don't use PgManager's cursor() and put_conn(). 
+        
+        name -- name for connection
+        channel -- listen on this channel
+        copy_dsn -- specify name of other connection and its dsn will be used
+        
+        Other parameters forwarded to create_conn().
+        
+        '''
+        if dsn is None and copy_dsn:
+            try:
+                dsn = self.conn_known[copy_dsn].dsn
+            except KeyError:
+                raise PgManagerError("Connection name '%s' not registered." % copy_dsn)
+        listen_query = "LISTEN " + channel
+        self.create_conn(name=name, pool_size=None, isolation_level='autocommit', init_statement=listen_query,
+            dsn=dsn, **kwargs)
 
     def close_conn(self, name='default'):
         '''Close all connections of given name.
@@ -256,20 +286,36 @@
         self._check_fork()
         self.lock.acquire()
         try:
-            if not name in self.conn_known:
+            try:
+                ci = self.conn_known[name]
+            except KeyError:
                 raise PgManagerError("Connection name '%s' not registered." % name)
-
-            conn = None
-            while len(self.conn_pool[name]) and conn is None:
-                conn = self.conn_pool[name].pop()
-                if conn.closed:
+            
+            # no pool, just one static connection
+            if ci.pool_size is None:
+                # check for existing connection
+                try:
+                    conn = self.conn_pool[name][0]
+                    if conn.closed:
+                        conn = None
+                except IndexError:
                     conn = None
-
-            if conn is None:
-                ci = self.conn_known[name]
-                conn = self._connect(ci)
-                # add our name to connection instance (this is then logged with SQL queries)
-                conn.name = name
+                    self.conn_pool[name].append(conn)
+                # if no existing connection is valid, connect new one and save it
+                if conn is None:
+                    conn = self._connect(ci)
+                    self.conn_pool[name][0] = conn
+            
+            # connection from pool
+            else:
+                conn = None
+                while len(self.conn_pool[name]) and conn is None:
+                    conn = self.conn_pool[name].pop()
+                    if conn.closed:
+                        conn = None
+                
+                if conn is None:
+                    conn = self._connect(ci)
         finally:
             self.lock.release()
         return conn
@@ -286,7 +332,7 @@
             if not name in self.conn_known:
                 raise PgManagerError("Connection name '%s' not registered." % name)
 
-            if len(self.conn_pool[name]) >= self.conn_known[name].keep_open:
+            if len(self.conn_pool[name]) >= self.conn_known[name].pool_size:
                 conn.close()
                 return
 
@@ -330,38 +376,35 @@
             log_notices.info(notice.rstrip())
         conn.notices[:] = []
 
-    def wait_for_notify(self, name='default', timeout=5):
+    def wait_for_notify(self, name='default', timeout=5.0):
         '''Wait for asynchronous notifies, return the last one.
         
+        name -- name of connection, must be created using create_conn_listen()
+        timeout -- in seconds, floating point (None - wait forever)
+        
         Returns None on timeout.
         
         '''
         conn = self.get_conn(name)
         
-        try:
-            # any residual notify?
-            # then return it, that should not break anything
+        # return any notifies on stack
+        if conn.notifies:
+            return conn.notifies.pop()
+
+        if select.select([conn], [], [], timeout) == ([], [], []):
+            # timeout
+            return None
+        else:
+            conn.poll()
+
+            # return just the last notify (we do not care for older ones)
             if conn.notifies:
                 return conn.notifies.pop()
-
-            if select.select([conn], [], [], timeout) == ([], [], []):
-                # timeout
-                return None
-            else:
-                conn.poll()
-
-                # return just the last notify (we do not care for older ones)
-                if conn.notifies:
-                    return conn.notifies.pop()
-                return None
-        finally:
-            # clean notifies
-            while conn.notifies:
-                conn.notifies.pop()
-            self.put_conn(conn, name)
+            return None
 
     def _connect(self, ci):
         conn = psycopg2.connect(ci.dsn, connection_factory=Connection)
+        conn.name = ci.name
         if ci.keep_alive:
             conn.keep_alive()
         if not ci.isolation_level is None: