pgtools/pgmanager.py
changeset 7 685b20d2d3ab
parent 6 4ab077c93b2d
child 8 2911935c524d
--- a/pgtools/pgmanager.py	Wed Aug 10 18:34:54 2011 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,323 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# PgManager - manage database connections
-#
-# Requires: Python 2.6, psycopg2
-#
-# Copyright (c) 2010, 2011  Radek Brich <radek.brich@devl.cz>
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-"""Postgres database connection manager
-
-PgManager wraps psycopg2 connect function, adding following features:
-
- * Manage database connection parameters - link connection parameters
-   to an unique identifier, retrieve connection object by this identifier
-
- * Connection pooling - connections with same identifier are pooled and reused
-
- * Easy query using the with statement - retrieve cursor directly by connection
-   identifier, don't worry about connections
-
- * Dict rows - cursor has additional methods like fetchall_dict(), which
-   returns dict row instead of ordinary list-like row
-
-Example:
-
-import pgmanager
-
-pgm = pgmanager.get_instance()
-pgm.create_conn(hostaddr='127.0.0.1', dbname='postgres')
-
-with pgm.cursor() as curs:
-    curs.execute('SELECT now() AS now')
-    row = curs.fetchone_dict()
-    print row.now
-
-First, we have obtained PgManager instance. This is like calling
-PgManager(), although in our example the instance is global. That means
-getting the instance in another module brings us all the defined connections
-etc.
-
-On second line we created connection named 'default' (this name can be left out).
-The with statement obtains connection (actually connects to database when needed),
-then returns cursor for this connection. On exit, the connection is returned
-to the pool or closed (depending on number of connections on pool and setting
-of keep_open parameter).
-
-The row returned by fetchone_dict() is special dict object, which can be accessed
-using item or attribute access, that is row['now'] or row.now.
-"""
-
-from contextlib import contextmanager
-import logging
-import threading
-import select
-
-import psycopg2
-import psycopg2.extensions
-
-from psycopg2 import DatabaseError, IntegrityError
-
-
-class PgManagerError(Exception):
-
-    pass
-
-
-class ConnectionInfo:
-
-    def __init__(self, dsn, isolation_level=None, init_statement=None, keep_open=1):
-        self.dsn = dsn
-        self.isolation_level = isolation_level
-        self.init_statement = init_statement
-        self.keep_open = keep_open
-
-
-class RowDict(dict):
-
-    def __getattr__(self, key):
-        return self[key]
-
-
-class Cursor(psycopg2.extensions.cursor):
-
-    def execute(self, query, args=None):
-        try:
-            return super(Cursor, self).execute(query, args)
-        finally:
-            log.debug(self.query.decode('utf8'))
-
-    def callproc(self, procname, args=None):
-        try:
-            return super(Cursor, self).callproc(procname, args)
-        finally:
-            log.debug(self.query.decode('utf8'))
-
-    def row_dict(self, row, lstrip=None):
-        adjustname = lambda a: a
-        if lstrip:
-            adjustname = lambda a: a.lstrip(lstrip)
-        return RowDict(zip([adjustname(desc[0]) for desc in self.description], row))
-
-    def fetchone_dict(self, lstrip=None):
-        row = super(Cursor, self).fetchone()
-        if not row:
-            return row
-        return self.row_dict(row, lstrip)
-
-    def fetchall_dict(self, lstrip=None):
-        rows = super(Cursor, self).fetchall()
-        return [self.row_dict(row, lstrip) for row in rows]
-
-
-class Connection(psycopg2.extensions.connection):
-
-    def cursor(self, name=None):
-        if name is None:
-            return super(Connection, self).cursor(cursor_factory=Cursor)
-        else:
-            return super(Connection, self).cursor(name, cursor_factory=Cursor)
-
-
-class PgManager:
-
-    def __init__(self):
-        self.conn_known = {}  # available connections
-        self.conn_pool = {}
-        self.lock = threading.Lock()
-
-    def __del__(self):
-        for conn in tuple(self.conn_known.keys()):
-            self.destroy_conn(conn)
-
-    def create_conn(self, name='default', isolation_level=None, dsn=None, **kw):
-        '''Create named connection.'''
-        if name in self.conn_known:
-            raise PgManagerError('Connection name "%s" already registered.' % name)
-
-        if dsn is None:
-            dsn = ' '.join([x[0]+'='+str(x[1]) for x in kw.items()])
-
-        isolation_level = self._normalize_isolation_level(isolation_level)
-        ci = ConnectionInfo(dsn, isolation_level)
-
-        self.conn_known[name] = ci
-        self.conn_pool[name] = []
-
-    def close_conn(self, name='default'):
-        '''Close all connections of given name.
-        
-        Connection credentials are still saved.
-        
-        '''
-        while len(self.conn_pool[name]):
-            conn = self.conn_pool[name].pop()
-            conn.close()
-
-    def destroy_conn(self, name='default'):
-        '''Destroy connection.
-        
-        Counterpart of create_conn.
-        
-        '''
-        if not name in self.conn_known:
-            raise PgManagerError('Connection name "%s" not registered.' % name)
-
-        self.close_conn(name)
-
-        del self.conn_known[name]
-        del self.conn_pool[name]
-
-    def get_conn(self, name='default'):
-        '''Get connection of name 'name' from pool.'''
-        self.lock.acquire()
-        try:
-            if not name in self.conn_known:
-                raise PgManagerError("Connection name '%s' not registered." % name)
-
-            conn = None
-            while len(self.conn_pool[name]) and conn is None:
-                conn = self.conn_pool[name].pop()
-                if conn.closed:
-                    conn = None
-
-            if conn is None:
-                ci = self.conn_known[name]
-                conn = psycopg2.connect(ci.dsn, connection_factory=Connection)
-                if not ci.isolation_level is None:
-                    conn.set_isolation_level(ci.isolation_level)
-                if ci.init_statement:
-                    curs = conn.cursor()
-                    curs.execute(ci.init_statement)
-                    curs.close()
-        finally:
-            self.lock.release()
-        return conn
-
-    def put_conn(self, conn, name='default'):
-        '''Put connection back to pool.
-        
-        Name must be same as used for get_conn,
-        otherwise things become broken.
-        
-        '''
-        self.lock.acquire()
-        try:
-            if not name in self.conn_known:
-                raise PgManagerError("Connection name '%s' not registered." % name)
-
-            if len(self.conn_pool[name]) >= self.conn_known[name].keep_open:
-                conn.close()
-                return
-
-            if conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN:
-                conn.close()
-                return
-
-            # connection returned to the pool must not be in transaction
-            if conn.get_transaction_status() != psycopg2.extensions.TRANSACTION_STATUS_IDLE:
-                conn.rollback()
-
-            self.conn_pool[name].append(conn)
-        finally:
-            self.lock.release()
-
-    @contextmanager
-    def cursor(self, name='default'):
-        '''Cursor context.
-        
-        Uses any connection of name 'name' from pool
-        and returns cursor for that connection.
-        
-        '''
-        conn = self.get_conn(name)
-
-        try:
-            curs = conn.cursor()
-            yield curs
-        finally:
-            curs.close()
-            self.put_conn(conn, name)
-
-    def wait_for_notify(self, name='default', timeout=5):
-        '''Wait for asynchronous notifies, return the last one.
-        
-        Returns None on timeout.
-        
-        '''
-        conn = self.get_conn(name)
-        
-        try:
-            # any residual notify?
-            # then return it, that should not break anything
-            if conn.notifies:
-                return conn.notifies.pop()
-
-            if select.select([conn], [], [], timeout) == ([], [], []):
-                # timeout
-                return None
-            else:
-                conn.poll()
-
-                # return just the last notify (we do not care for older ones)
-                if conn.notifies:
-                    return conn.notifies.pop()
-                return None
-        finally:
-            # clean notifies
-            while conn.notifies:
-                conn.notifies.pop()
-            self.put_conn(conn, name)
-
-    def _normalize_isolation_level(self, level):
-        if type(level) == str:
-            if level.lower() == 'autocommit':
-                return psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
-            if level.lower() == 'read_committed':
-                return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED
-            if level.lower() == 'serializable':
-                return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
-            raise PgManagerError('Unknown isolation level name: "%s"', level)
-        return level
-
-
-try:
-    NullHandler = logging.NullHandler
-except AttributeError:
-    class NullHandler(logging.Handler):
-        def emit(self, record):
-            pass
-
-
-log = logging.getLogger("pgmanager")
-log.addHandler(NullHandler())
-
-
-instance = None
-
-
-def get_instance():
-    global instance
-    if instance is None:
-        instance = PgManager()
-    return instance
-
-