diff -r 2911935c524d -r 2fcc8ef0b97d tools/pgmanager.py --- a/tools/pgmanager.py Tue Aug 16 15:12:53 2011 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,357 +0,0 @@ -# -*- coding: utf-8 -*- -# -# PgManager - manage database connections -# -# Requires: Python 2.6, psycopg2 -# -# Copyright (c) 2010, 2011 Radek Brich -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -"""Postgres database connection manager - -PgManager wraps psycopg2 connect function, adding following features: - - * Manage database connection parameters - link connection parameters - to an unique identifier, retrieve connection object by this identifier - - * Connection pooling - connections with same identifier are pooled and reused - - * Easy query using the with statement - retrieve cursor directly by connection - identifier, don't worry about connections - - * Dict rows - cursor has additional methods like fetchall_dict(), which - returns dict row instead of ordinary list-like row - -Example: - -import pgmanager - -pgm = pgmanager.get_instance() -pgm.create_conn(hostaddr='127.0.0.1', dbname='postgres') - -with pgm.cursor() as curs: - curs.execute('SELECT now() AS now') - row = curs.fetchone_dict() - print row.now - -First, we have obtained PgManager instance. This is like calling -PgManager(), although in our example the instance is global. That means -getting the instance in another module brings us all the defined connections -etc. - -On second line we created connection named 'default' (this name can be left out). -The with statement obtains connection (actually connects to database when needed), -then returns cursor for this connection. On exit, the connection is returned -to the pool or closed (depending on number of connections on pool and setting -of keep_open parameter). - -The row returned by fetchone_dict() is special dict object, which can be accessed -using item or attribute access, that is row['now'] or row.now. -""" - -from contextlib import contextmanager -import logging -import threading -import select -import socket - -import psycopg2 -import psycopg2.extensions - -from psycopg2 import DatabaseError, IntegrityError - - -class PgManagerError(Exception): - - pass - - -class ConnectionInfo: - - def __init__(self, dsn, isolation_level=None, init_statement=None, keep_open=1): - self.dsn = dsn - self.isolation_level = isolation_level - self.init_statement = init_statement - self.keep_open = keep_open - - -class RowDict(dict): - - def __getattr__(self, key): - return self[key] - - -class Cursor(psycopg2.extensions.cursor): - - def execute(self, query, args=None): - try: - return super(Cursor, self).execute(query, args) - finally: - log.debug(self.query.decode('utf8')) - - def callproc(self, procname, args=None): - try: - return super(Cursor, self).callproc(procname, args) - finally: - log.debug(self.query.decode('utf8')) - - def row_dict(self, row, lstrip=None): - adjustname = lambda a: a - if lstrip: - adjustname = lambda a: a.lstrip(lstrip) - return RowDict(zip([adjustname(desc[0]) for desc in self.description], row)) - - def fetchone_dict(self, lstrip=None): - row = super(Cursor, self).fetchone() - if row is None: - return None - return self.row_dict(row, lstrip) - - def fetchall_dict(self, lstrip=None): - rows = super(Cursor, self).fetchall() - return [self.row_dict(row, lstrip) for row in rows] - - def fetchone_adapted(self): - '''Like fetchone() but values are quoted for direct inclusion in SQL query. - - This is useful when you need to generate SQL script from data returned - by the query. Use mogrify() for simple cases. - - ''' - row = super(Cursor, self).fetchone() - if row is None: - return None - return [self.mogrify('%s', [x]).decode('utf8') for x in row] - - def fetchall_adapted(self): - '''Like fetchall() but values are quoted for direct inclusion in SQL query.''' - rows = super(Cursor, self).fetchall() - return [[self.mogrify('%s', [x]).decode('utf8') for x in row] for row in rows] - - -class Connection(psycopg2.extensions.connection): - - def cursor(self, name=None): - if name is None: - return super(Connection, self).cursor(cursor_factory=Cursor) - else: - return super(Connection, self).cursor(name, cursor_factory=Cursor) - - def keep_alive(self): - '''Set socket to keepalive mode. Must be called before any query.''' - sock = socket.fromfd(self.fileno(), socket.AF_INET, socket.SOCK_STREAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - # Maximum keep-alive probes before asuming the connection is lost - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5) - # Interval (in seconds) between keep-alive probes - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 2) - # Maximum idle time (in seconds) before start sending keep-alive probes - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10) - - -class PgManager: - - def __init__(self): - self.conn_known = {} # available connections - self.conn_pool = {} - self.lock = threading.Lock() - - def __del__(self): - for conn in tuple(self.conn_known.keys()): - self.destroy_conn(conn) - - def create_conn(self, name='default', isolation_level=None, dsn=None, **kw): - '''Create named connection.''' - if name in self.conn_known: - raise PgManagerError('Connection name "%s" already registered.' % name) - - if dsn is None: - dsn = ' '.join([x[0]+'='+str(x[1]) for x in kw.items()]) - - isolation_level = self._normalize_isolation_level(isolation_level) - ci = ConnectionInfo(dsn, isolation_level) - - self.conn_known[name] = ci - self.conn_pool[name] = [] - - def close_conn(self, name='default'): - '''Close all connections of given name. - - Connection credentials are still saved. - - ''' - while len(self.conn_pool[name]): - conn = self.conn_pool[name].pop() - conn.close() - - def destroy_conn(self, name='default'): - '''Destroy connection. - - Counterpart of create_conn. - - ''' - if not name in self.conn_known: - raise PgManagerError('Connection name "%s" not registered.' % name) - - self.close_conn(name) - - del self.conn_known[name] - del self.conn_pool[name] - - def get_conn(self, name='default'): - '''Get connection of name 'name' from pool.''' - self.lock.acquire() - try: - if not name in self.conn_known: - raise PgManagerError("Connection name '%s' not registered." % name) - - conn = None - while len(self.conn_pool[name]) and conn is None: - conn = self.conn_pool[name].pop() - if conn.closed: - conn = None - - if conn is None: - ci = self.conn_known[name] - conn = self._connect(ci) - finally: - self.lock.release() - return conn - - def put_conn(self, conn, name='default'): - '''Put connection back to pool. - - Name must be same as used for get_conn, - otherwise things become broken. - - ''' - self.lock.acquire() - try: - if not name in self.conn_known: - raise PgManagerError("Connection name '%s' not registered." % name) - - if len(self.conn_pool[name]) >= self.conn_known[name].keep_open: - conn.close() - return - - if conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN: - conn.close() - return - - # connection returned to the pool must not be in transaction - if conn.get_transaction_status() != psycopg2.extensions.TRANSACTION_STATUS_IDLE: - conn.rollback() - - self.conn_pool[name].append(conn) - finally: - self.lock.release() - - @contextmanager - def cursor(self, name='default'): - '''Cursor context. - - Uses any connection of name 'name' from pool - and returns cursor for that connection. - - ''' - conn = self.get_conn(name) - - try: - curs = conn.cursor() - yield curs - finally: - curs.close() - self.put_conn(conn, name) - - def wait_for_notify(self, name='default', timeout=5): - '''Wait for asynchronous notifies, return the last one. - - Returns None on timeout. - - ''' - conn = self.get_conn(name) - - try: - # any residual notify? - # then return it, that should not break anything - if conn.notifies: - return conn.notifies.pop() - - if select.select([conn], [], [], timeout) == ([], [], []): - # timeout - return None - else: - conn.poll() - - # return just the last notify (we do not care for older ones) - if conn.notifies: - return conn.notifies.pop() - return None - finally: - # clean notifies - while conn.notifies: - conn.notifies.pop() - self.put_conn(conn, name) - - def _connect(self, ci): - conn = psycopg2.connect(ci.dsn, connection_factory=Connection) - conn.keep_alive() - if not ci.isolation_level is None: - conn.set_isolation_level(ci.isolation_level) - if ci.init_statement: - curs = conn.cursor() - curs.execute(ci.init_statement) - curs.close() - return conn - - def _normalize_isolation_level(self, level): - if type(level) == str: - if level.lower() == 'autocommit': - return psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT - if level.lower() == 'read_committed': - return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED - if level.lower() == 'serializable': - return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE - raise PgManagerError('Unknown isolation level name: "%s"', level) - return level - - -try: - NullHandler = logging.NullHandler -except AttributeError: - class NullHandler(logging.Handler): - def emit(self, record): - pass - - -log = logging.getLogger("pgmanager") -log.addHandler(NullHandler()) - - -instance = None - - -def get_instance(): - global instance - if instance is None: - instance = PgManager() - return instance - -