diff -r 000000000000 -r eaae9539e910 pgtools/pgmanager.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pgtools/pgmanager.py Thu May 26 18:09:05 2011 +0200 @@ -0,0 +1,323 @@ +# -*- coding: utf-8 -*- +# +# PgManager - manage database connections +# +# Requires: Python 2.6, psycopg2 +# +# Copyright (c) 2010, 2011 Radek Brich +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +"""Postgres database connection manager + +PgManager wraps psycopg2 connect function, adding following features: + + * Manage database connection parameters - link connection parameters + to an unique identifier, retrieve connection object by this identifier + + * Connection pooling - connections with same identifier are pooled and reused + + * Easy query using the with statement - retrieve cursor directly by connection + identifier, don't worry about connections + + * Dict rows - cursor has additional methods like fetchall_dict(), which + returns dict row instead of ordinary list-like row + +Example: + +import pgmanager + +pgm = pgmanager.get_instance() +pgm.create_conn(hostaddr='127.0.0.1', dbname='postgres') + +with pgm.cursor() as curs: + curs.execute('SELECT now() AS now') + row = curs.fetchone_dict() + print row.now + +First, we have obtained PgManager instance. This is like calling +PgManager(), although in our example the instance is global. That means +getting the instance in another module brings us all the defined connections +etc. + +On second line we created connection named 'default' (this name can be left out). +The with statement obtains connection (actually connects to database when needed), +then returns cursor for this connection. On exit, the connection is returned +to the pool or closed (depending on number of connections on pool and setting +of keep_open parameter). + +The row returned by fetchone_dict() is special dict object, which can be accessed +using item or attribute access, that is row['now'] or row.now. +""" + +from contextlib import contextmanager +import logging +import threading +import select + +import psycopg2 +import psycopg2.extensions + +from psycopg2 import DatabaseError, IntegrityError + + +class PgManagerError(Exception): + + pass + + +class ConnectionInfo: + + def __init__(self, dsn, isolation_level=None, init_statement=None, keep_open=1): + self.dsn = dsn + self.isolation_level = isolation_level + self.init_statement = init_statement + self.keep_open = keep_open + + +class RowDict(dict): + + def __getattr__(self, key): + return self[key] + + +class Cursor(psycopg2.extensions.cursor): + + def execute(self, query, args=None): + try: + return super(Cursor, self).execute(query, args) + finally: + log.debug(self.query) + + def callproc(self, procname, args=None): + try: + return super(Cursor, self).callproc(procname, args) + finally: + log.debug(self.query) + + def row_dict(self, row, lstrip=None): + adjustname = lambda a: a + if lstrip: + adjustname = lambda a: a.lstrip(lstrip) + return RowDict(zip([adjustname(desc[0]) for desc in self.description], row)) + + def fetchone_dict(self, lstrip=None): + row = super(Cursor, self).fetchone() + if not row: + return row + return self.row_dict(row, lstrip) + + def fetchall_dict(self, lstrip=None): + rows = super(Cursor, self).fetchall() + return [self.row_dict(row, lstrip) for row in rows] + + +class Connection(psycopg2.extensions.connection): + + def cursor(self, name=None): + if name is None: + return super(Connection, self).cursor(cursor_factory=Cursor) + else: + return super(Connection, self).cursor(name, cursor_factory=Cursor) + + +class PgManager: + + def __init__(self): + self.conn_known = {} # available connections + self.conn_pool = {} + self.lock = threading.Lock() + + def __del__(self): + for conn in tuple(self.conn_known.keys()): + self.destroy_conn(conn) + + def create_conn(self, name='default', isolation_level=None, dsn=None, **kw): + '''Create named connection.''' + if name in self.conn_known: + raise PgManagerError('Connection name "%s" already registered.', name) + + if dsn is None: + dsn = ' '.join([x[0]+'='+str(x[1]) for x in kw.items()]) + + isolation_level = self._normalize_isolation_level(isolation_level) + ci = ConnectionInfo(dsn, isolation_level) + + self.conn_known[name] = ci + self.conn_pool[name] = [] + + def close_conn(self, name='default'): + '''Close all connections of given name. + + Connection credentials are still saved. + + ''' + while len(self.conn_pool[name]): + conn = self.conn_pool[name].pop() + conn.close() + + def destroy_conn(self, name='default'): + '''Destroy connection. + + Counterpart of create_conn. + + ''' + if not name in self.conn_known: + raise PgManagerError('Connection name "%s" not registered.', name) + + self.close_conn(name) + + del self.conn_known[name] + del self.conn_pool[name] + + def get_conn(self, name='default'): + '''Get connection of name 'name' from pool.''' + self.lock.acquire() + try: + if not name in self.conn_known: + raise PgManagerError("Connection name '%s' not registered.", name) + + conn = None + while len(self.conn_pool[name]) and conn is None: + conn = self.conn_pool[name].pop() + if conn.closed: + conn = None + + if conn is None: + ci = self.conn_known[name] + conn = psycopg2.connect(ci.dsn, connection_factory=Connection) + if not ci.isolation_level is None: + conn.set_isolation_level(ci.isolation_level) + if ci.init_statement: + curs = conn.cursor() + curs.execute(ci.init_statement) + curs.close() + finally: + self.lock.release() + return conn + + def put_conn(self, conn, name='default'): + '''Put connection back to pool. + + Name must be same as used for get_conn, + otherwise things become broken. + + ''' + self.lock.acquire() + try: + if not name in self.conn_known: + raise PgManagerError("Connection name '%s' not registered.", name) + + if len(self.conn_pool[name]) >= self.conn_known[name].keep_open: + conn.close() + return + + if conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN: + conn.close() + return + + # connection returned to the pool must not be in transaction + if conn.get_transaction_status() != psycopg2.extensions.TRANSACTION_STATUS_IDLE: + conn.rollback() + + self.conn_pool[name].append(conn) + finally: + self.lock.release() + + @contextmanager + def cursor(self, name='default'): + '''Cursor context. + + Uses any connection of name 'name' from pool + and returns cursor for that connection. + + ''' + conn = self.get_conn(name) + + try: + curs = conn.cursor() + yield curs + finally: + curs.close() + self.put_conn(conn, name) + + def wait_for_notify(self, name='default', timeout=5): + '''Wait for asynchronous notifies, return the last one. + + Returns None on timeout. + + ''' + conn = self.get_conn(name) + + try: + # any residual notify? + # then return it, that should not break anything + if conn.notifies: + return conn.notifies.pop() + + if select.select([conn], [], [], timeout) == ([], [], []): + # timeout + return None + else: + conn.poll() + + # return just the last notify (we do not care for older ones) + if conn.notifies: + return conn.notifies.pop() + return None + finally: + # clean notifies + while conn.notifies: + conn.notifies.pop() + self.put_conn(conn, name) + + def _normalize_isolation_level(self, level): + if type(level) == str: + if level.lower() == 'autocommit': + return psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT + if level.lower() == 'read_committed': + return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED + if level.lower() == 'serializable': + return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE + raise PgManagerError('Unknown isolation level name: "%s"', level) + return level + + +try: + NullHandler = logging.NullHandler +except AttributeError: + class NullHandler(logging.Handler): + def emit(self, record): + pass + + +log = logging.getLogger("pgmanager") +log.addHandler(NullHandler()) + + +instance = None + + +def get_instance(): + global instance + if instance is None: + instance = PgManager() + return instance + +