pgtoolkit/pgmanager.py
changeset 9 2fcc8ef0b97d
parent 8 2911935c524d
child 19 e526ca146fa9
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pgtoolkit/pgmanager.py	Tue Aug 16 16:03:46 2011 +0200
@@ -0,0 +1,360 @@
+# -*- coding: utf-8 -*-
+#
+# PgManager - manage database connections
+#
+# Requires: Python 2.6, psycopg2
+#
+# Part of pgtoolkit
+# http://hg.devl.cz/pgtoolkit
+#
+# Copyright (c) 2010, 2011  Radek Brich <radek.brich@devl.cz>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+"""Postgres database connection manager
+
+PgManager wraps psycopg2 connect function, adding following features:
+
+ * Manage database connection parameters - link connection parameters
+   to an unique identifier, retrieve connection object by this identifier
+
+ * Connection pooling - connections with same identifier are pooled and reused
+
+ * Easy query using the with statement - retrieve cursor directly by connection
+   identifier, don't worry about connections
+
+ * Dict rows - cursor has additional methods like fetchall_dict(), which
+   returns dict row instead of ordinary list-like row
+
+Example:
+
+import pgmanager
+
+pgm = pgmanager.get_instance()
+pgm.create_conn(hostaddr='127.0.0.1', dbname='postgres')
+
+with pgm.cursor() as curs:
+    curs.execute('SELECT now() AS now')
+    row = curs.fetchone_dict()
+    print row.now
+
+First, we have obtained PgManager instance. This is like calling
+PgManager(), although in our example the instance is global. That means
+getting the instance in another module brings us all the defined connections
+etc.
+
+On second line we have created connection named 'default' (this name can be left out).
+The with statement obtains connection (actually connects to database when needed),
+then returns cursor for this connection. At the end of with statement,
+the connection is returned to the pool or closed (depending on number of connections
+in pool and on setting of keep_open parameter).
+
+The row returned by fetchone_dict() is special dict object, which can be accessed
+using item or attribute access, that is row['now'] or row.now.
+"""
+
+from contextlib import contextmanager
+import logging
+import threading
+import select
+import socket
+
+import psycopg2
+import psycopg2.extensions
+
+from psycopg2 import DatabaseError, IntegrityError
+
+
+class PgManagerError(Exception):
+
+    pass
+
+
+class ConnectionInfo:
+
+    def __init__(self, dsn, isolation_level=None, init_statement=None, keep_open=1):
+        self.dsn = dsn
+        self.isolation_level = isolation_level
+        self.init_statement = init_statement
+        self.keep_open = keep_open
+
+
+class RowDict(dict):
+
+    def __getattr__(self, key):
+        return self[key]
+
+
+class Cursor(psycopg2.extensions.cursor):
+
+    def execute(self, query, args=None):
+        try:
+            return super(Cursor, self).execute(query, args)
+        finally:
+            log.debug(self.query.decode('utf8'))
+
+    def callproc(self, procname, args=None):
+        try:
+            return super(Cursor, self).callproc(procname, args)
+        finally:
+            log.debug(self.query.decode('utf8'))
+
+    def row_dict(self, row, lstrip=None):
+        adjustname = lambda a: a
+        if lstrip:
+            adjustname = lambda a: a.lstrip(lstrip)
+        return RowDict(zip([adjustname(desc[0]) for desc in self.description], row))
+
+    def fetchone_dict(self, lstrip=None):
+        row = super(Cursor, self).fetchone()
+        if row is None:
+            return None
+        return self.row_dict(row, lstrip)
+
+    def fetchall_dict(self, lstrip=None):
+        rows = super(Cursor, self).fetchall()
+        return [self.row_dict(row, lstrip) for row in rows]
+
+    def fetchone_adapted(self):
+        '''Like fetchone() but values are quoted for direct inclusion in SQL query.
+        
+        This is useful when you need to generate SQL script from data returned
+        by the query. Use mogrify() for simple cases.
+        
+        '''
+        row = super(Cursor, self).fetchone()
+        if row is None:
+            return None
+        return [self.mogrify('%s', [x]).decode('utf8') for x in row]
+
+    def fetchall_adapted(self):
+        '''Like fetchall() but values are quoted for direct inclusion in SQL query.'''
+        rows = super(Cursor, self).fetchall()
+        return [[self.mogrify('%s', [x]).decode('utf8') for x in row] for row in rows]
+
+
+class Connection(psycopg2.extensions.connection):
+
+    def cursor(self, name=None):
+        if name is None:
+            return super(Connection, self).cursor(cursor_factory=Cursor)
+        else:
+            return super(Connection, self).cursor(name, cursor_factory=Cursor)
+
+    def keep_alive(self):
+        '''Set socket to keepalive mode. Must be called before any query.'''
+        sock = socket.fromfd(self.fileno(), socket.AF_INET, socket.SOCK_STREAM) 
+        sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+        # Maximum keep-alive probes before asuming the connection is lost
+        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
+        # Interval (in seconds) between keep-alive probes
+        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 2)
+        # Maximum idle time (in seconds) before start sending keep-alive probes
+        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10)
+
+
+class PgManager:
+
+    def __init__(self):
+        self.conn_known = {}  # available connections
+        self.conn_pool = {}
+        self.lock = threading.Lock()
+
+    def __del__(self):
+        for conn in tuple(self.conn_known.keys()):
+            self.destroy_conn(conn)
+
+    def create_conn(self, name='default', isolation_level=None, dsn=None, **kw):
+        '''Create named connection.'''
+        if name in self.conn_known:
+            raise PgManagerError('Connection name "%s" already registered.' % name)
+
+        if dsn is None:
+            dsn = ' '.join([x[0]+'='+str(x[1]) for x in kw.items()])
+
+        isolation_level = self._normalize_isolation_level(isolation_level)
+        ci = ConnectionInfo(dsn, isolation_level)
+
+        self.conn_known[name] = ci
+        self.conn_pool[name] = []
+
+    def close_conn(self, name='default'):
+        '''Close all connections of given name.
+        
+        Connection credentials are still saved.
+        
+        '''
+        while len(self.conn_pool[name]):
+            conn = self.conn_pool[name].pop()
+            conn.close()
+
+    def destroy_conn(self, name='default'):
+        '''Destroy connection.
+        
+        Counterpart of create_conn.
+        
+        '''
+        if not name in self.conn_known:
+            raise PgManagerError('Connection name "%s" not registered.' % name)
+
+        self.close_conn(name)
+
+        del self.conn_known[name]
+        del self.conn_pool[name]
+
+    def get_conn(self, name='default'):
+        '''Get connection of name 'name' from pool.'''
+        self.lock.acquire()
+        try:
+            if not name in self.conn_known:
+                raise PgManagerError("Connection name '%s' not registered." % name)
+
+            conn = None
+            while len(self.conn_pool[name]) and conn is None:
+                conn = self.conn_pool[name].pop()
+                if conn.closed:
+                    conn = None
+
+            if conn is None:
+                ci = self.conn_known[name]
+                conn = self._connect(ci)
+        finally:
+            self.lock.release()
+        return conn
+
+    def put_conn(self, conn, name='default'):
+        '''Put connection back to pool.
+        
+        Name must be same as used for get_conn,
+        otherwise things become broken.
+        
+        '''
+        self.lock.acquire()
+        try:
+            if not name in self.conn_known:
+                raise PgManagerError("Connection name '%s' not registered." % name)
+
+            if len(self.conn_pool[name]) >= self.conn_known[name].keep_open:
+                conn.close()
+                return
+
+            if conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN:
+                conn.close()
+                return
+
+            # connection returned to the pool must not be in transaction
+            if conn.get_transaction_status() != psycopg2.extensions.TRANSACTION_STATUS_IDLE:
+                conn.rollback()
+
+            self.conn_pool[name].append(conn)
+        finally:
+            self.lock.release()
+
+    @contextmanager
+    def cursor(self, name='default'):
+        '''Cursor context.
+        
+        Uses any connection of name 'name' from pool
+        and returns cursor for that connection.
+        
+        '''
+        conn = self.get_conn(name)
+
+        try:
+            curs = conn.cursor()
+            yield curs
+        finally:
+            curs.close()
+            self.put_conn(conn, name)
+
+    def wait_for_notify(self, name='default', timeout=5):
+        '''Wait for asynchronous notifies, return the last one.
+        
+        Returns None on timeout.
+        
+        '''
+        conn = self.get_conn(name)
+        
+        try:
+            # any residual notify?
+            # then return it, that should not break anything
+            if conn.notifies:
+                return conn.notifies.pop()
+
+            if select.select([conn], [], [], timeout) == ([], [], []):
+                # timeout
+                return None
+            else:
+                conn.poll()
+
+                # return just the last notify (we do not care for older ones)
+                if conn.notifies:
+                    return conn.notifies.pop()
+                return None
+        finally:
+            # clean notifies
+            while conn.notifies:
+                conn.notifies.pop()
+            self.put_conn(conn, name)
+
+    def _connect(self, ci):
+        conn = psycopg2.connect(ci.dsn, connection_factory=Connection)
+        conn.keep_alive()
+        if not ci.isolation_level is None:
+            conn.set_isolation_level(ci.isolation_level)
+        if ci.init_statement:
+            curs = conn.cursor()
+            curs.execute(ci.init_statement)
+            curs.close()
+        return conn
+
+    def _normalize_isolation_level(self, level):
+        if type(level) == str:
+            if level.lower() == 'autocommit':
+                return psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
+            if level.lower() == 'read_committed':
+                return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED
+            if level.lower() == 'serializable':
+                return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
+            raise PgManagerError('Unknown isolation level name: "%s"', level)
+        return level
+
+
+try:
+    NullHandler = logging.NullHandler
+except AttributeError:
+    class NullHandler(logging.Handler):
+        def emit(self, record):
+            pass
+
+
+log = logging.getLogger("pgmanager")
+log.addHandler(NullHandler())
+
+
+instance = None
+
+
+def get_instance():
+    global instance
+    if instance is None:
+        instance = PgManager()
+    return instance
+
+