diff -r 24e94a3da209 -r d8ff52a0390f pgtoolkit/pgmanager.py --- a/pgtoolkit/pgmanager.py Mon May 26 18:18:21 2014 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,515 +0,0 @@ -# -*- coding: utf-8 -*- -# -# PgManager - manage database connections -# -# Requires: Python 3.2, psycopg2 -# -# Part of pgtoolkit -# http://hg.devl.cz/pgtoolkit -# -# Copyright (c) 2010, 2011, 2012, 2013 Radek Brich -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -"""Postgres database connection manager - -PgManager wraps psycopg2, adding following features: - - * Save and reuse database connection parameters - - * Connection pooling - - * Easy query using the with statement - - * Dictionary rows - -Example usage: - - from pgtoolkit import pgmanager - - pgm = pgmanager.get_instance() - pgm.create_conn(hostaddr='127.0.0.1', dbname='postgres') - - with pgm.cursor() as curs: - curs.execute('SELECT now() AS now') - row = curs.fetchone_dict() - print(row.now) - -First, we have obtained PgManager instance. This is like calling -PgManager(), although in our example the instance is global. That means -getting the instance in another module brings us all the defined connections -etc. - -On second line we have created connection named 'default' (this name can be left out). -The with statement obtains connection (actually connects to database when needed), -then returns cursor for this connection. At the end of with statement, -the connection is returned to the pool or closed (depending on number of connections -in pool and on setting of pool_size parameter). - -The row returned by fetchone_dict() is special dict object, which can be accessed -using item or attribute access, that is row['now'] or row.now. - -""" - -from contextlib import contextmanager -from collections import OrderedDict -import logging -import threading -import multiprocessing -import select -import socket - -import psycopg2 -import psycopg2.extensions - -from psycopg2 import DatabaseError, IntegrityError, OperationalError - - -log_sql = logging.getLogger("pgmanager_sql") -log_notices = logging.getLogger("pgmanager_notices") -log_sql.addHandler(logging.NullHandler()) -# NullHandler not needed for notices which are INFO level only - - -class PgManagerError(Exception): - - pass - - -class ConnectionInfo: - - def __init__(self, name, dsn, isolation_level=None, keep_alive=True, - init_statement=None, pool_size=1): - self.name = name # connection name is logged with SQL queries - self.dsn = dsn # dsn or string with connection parameters - self.isolation_level = isolation_level - self.keep_alive = keep_alive - self.init_statement = init_statement - self.pool_size = pool_size - - -class RowDict(OrderedDict): - """Special dictionary used for rows returned from queries. - - Items keep order in which columns where returned from database. - - It supports three styles of access: - - Dict style: - row['id'] - for key in row: - ... - - Object style (only works if column name does not collide with any method name): - row.id - - Tuple style: - row[0] - id, name = row.values() - - """ - - def __getitem__(self, key): - if isinstance(key, int): - return tuple(self.values())[key] - else: - return OrderedDict.__getitem__(self, key) - - def __getattr__(self, key): - try: - return self[key] - except KeyError: - raise AttributeError(key) - - -class Cursor(psycopg2.extensions.cursor): - - def execute(self, query, args=None): - # log query before executing - self._log_query(query, args) - try: - return super(Cursor, self).execute(query, args) - except DatabaseError: - self._log_exception() - raise - - def callproc(self, procname, args=None): - # log query before executing (not query actually executed but should correspond) - self._log_query(self._build_callproc_query(procname, len(args)), args) - try: - return super(Cursor, self).callproc(procname, args) - except DatabaseError: - self._log_exception() - raise - - def row_dict(self, row, lstrip=None): - adjustname = lambda a: a - if lstrip: - adjustname = lambda a: a.lstrip(lstrip) - return RowDict(zip([adjustname(desc[0]) for desc in self.description], row)) - - def fetchone_dict(self, lstrip=None): - '''Return one row as OrderedDict''' - row = super(Cursor, self).fetchone() - if row is None: - return None - return self.row_dict(row, lstrip) - - def fetchall_dict(self, lstrip=None): - '''Return all rows as OrderedDict''' - rows = super(Cursor, self).fetchall() - return [self.row_dict(row, lstrip) for row in rows] - - def adapt(self, row): - if isinstance(row, RowDict): - # dict - adapted = dict() - for key in row.keys(): - adapted[key] = self.mogrify('%s', [row[key]]).decode('utf8') - return RowDict(adapted) - else: - # list - return [self.mogrify('%s', [x]).decode('utf8') for x in row] - - def fetchone_adapted(self, lstrip=None): - '''Like fetchone_dict() but values are quoted for direct inclusion in SQL query. - - This is useful when you need to generate SQL script from data returned - by the query. Use mogrify() for simple cases. - - ''' - row = super(Cursor, self).fetchone() - if row is None: - return None - return self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip) - - def fetchall_adapted(self, lstrip=None): - '''Like fetchall_dict() but values are quoted for direct inclusion in SQL query.''' - rows = super(Cursor, self).fetchall() - return [self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip) for row in rows] - - def _log_query(self, query='?', args=None): - name = self.connection.name if hasattr(self.connection, 'name') else '-' - query = self.mogrify(query, args) - log_sql.debug('[%s] %s' % (name, query.decode('utf8'))) - - def _log_exception(self): - name = self.connection.name if hasattr(self.connection, 'name') else '-' - log_sql.exception('[%s] exception:' % (name,)) - - def _build_callproc_query(self, procname, num_args): - return 'SELECT * FROM %s(%s)' % (procname, ', '.join(['%s'] * num_args)) - - -class Connection(psycopg2.extensions.connection): - - def cursor(self, name=None): - if name is None: - return super(Connection, self).cursor(cursor_factory=Cursor) - else: - return super(Connection, self).cursor(name, cursor_factory=Cursor) - - def keep_alive(self): - '''Set socket to keepalive mode. Must be called before any query.''' - sock = socket.fromfd(self.fileno(), socket.AF_INET, socket.SOCK_STREAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - try: - # Maximum keep-alive probes before asuming the connection is lost - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5) - # Interval (in seconds) between keep-alive probes - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 2) - # Maximum idle time (in seconds) before start sending keep-alive probes - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10) - except socket.error: - pass - # close duplicated fd, options set for socket stays - sock.close() - - -class PgManager: - - def __init__(self): - self.conn_known = {} # available connections - self.conn_pool = {} # active connetions - self.lock = threading.Lock() # mutual exclusion for threads - self.pid = multiprocessing.current_process().pid # forking check - - def __del__(self): - for conn in tuple(self.conn_known.keys()): - self.destroy_conn(conn) - - def create_conn(self, name='default', isolation_level=None, keep_alive=True, init_statement=None, - pool_size=1, dsn=None, **kwargs): - '''Create named connection. - - *name* -- name for connection - - *pool_size* -- how many connections will be kept open in pool. - More connections will still be created but they will be closed by put_conn. - `None` will disable pool, get_conn() will then always return same connection. - - *isolation_level* -- `"autocommit"`, `"read_committed"`, `"serializable"` or `None` for driver default - - *keep_alive* -- set socket to keepalive mode - - *dsn* -- connection string (parameters or data source name) - - Other keyword args are used as connection parameters. - - ''' - if name in self.conn_known: - raise PgManagerError('Connection name "%s" already registered.' % name) - - if dsn is None: - dsn = ' '.join([x[0]+'='+str(x[1]) for x in kwargs.items() if x[1] is not None]) - - isolation_level = self._normalize_isolation_level(isolation_level) - ci = ConnectionInfo(name, dsn, isolation_level, keep_alive, init_statement, pool_size) - - self.conn_known[name] = ci - self.conn_pool[name] = [] - - def create_conn_listen(self, name, channel, dsn=None, copy_dsn=None, **kwargs): - '''Create connection listening for notifies. - - Disables pool. If you want to use pool, create other connection for that. - This connection can be used as usual: conn.cursor() etc. - Don't use PgManager's cursor() and put_conn(). - - *name* -- name for connection - - *channel* -- listen on this channel - - *copy_dsn* -- specify name of other connection and its dsn will be used - - Other parameters forwarded to create_conn(). - - ''' - if dsn is None and copy_dsn: - try: - dsn = self.conn_known[copy_dsn].dsn - except KeyError: - raise PgManagerError("Connection name '%s' not registered." % copy_dsn) - listen_query = "LISTEN " + channel - self.create_conn(name=name, pool_size=None, isolation_level='autocommit', init_statement=listen_query, - dsn=dsn, **kwargs) - - def close_conn(self, name='default'): - '''Close all connections of given name. - - Connection credentials are still saved. - - ''' - while len(self.conn_pool[name]): - conn = self.conn_pool[name].pop() - conn.close() - - def destroy_conn(self, name='default'): - '''Destroy connection. - - Counterpart of create_conn. - - ''' - if not name in self.conn_known: - raise PgManagerError('Connection name "%s" not registered.' % name) - - self.close_conn(name) - - del self.conn_known[name] - del self.conn_pool[name] - - def knows_conn(self, name='default'): - return name in self.conn_known - - def get_conn(self, name='default'): - '''Get connection of name 'name' from pool.''' - self._check_fork() - self.lock.acquire() - try: - try: - ci = self.conn_known[name] - except KeyError: - raise PgManagerError("Connection name '%s' not registered." % name) - - # no pool, just one static connection - if ci.pool_size is None: - # check for existing connection - try: - conn = self.conn_pool[name][0] - if conn.closed: - conn = None - except IndexError: - conn = None - self.conn_pool[name].append(conn) - # if no existing connection is valid, connect new one and save it - if conn is None: - conn = self._connect(ci) - self.conn_pool[name][0] = conn - - # connection from pool - else: - conn = None - while len(self.conn_pool[name]) and conn is None: - conn = self.conn_pool[name].pop() - if conn.closed: - conn = None - - if conn is None: - conn = self._connect(ci) - finally: - self.lock.release() - return conn - - def put_conn(self, conn, name='default'): - '''Put connection back to pool. - - *name* must be same as used for get_conn, otherwise things become broken. - - ''' - self.lock.acquire() - try: - if not name in self.conn_known: - raise PgManagerError("Connection name '%s' not registered." % name) - - if len(self.conn_pool[name]) >= self.conn_known[name].pool_size: - conn.close() - return - - if conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN: - conn.close() - return - - # connection returned to the pool must not be in transaction - if conn.get_transaction_status() != psycopg2.extensions.TRANSACTION_STATUS_IDLE: - try: - conn.rollback() - except OperationalError: - if not conn.closed: - conn.close() - return - - self.conn_pool[name].append(conn) - finally: - self.lock.release() - - @contextmanager - def cursor(self, name='default'): - '''Cursor context. - - Uses any connection info with *name* from pool - and returns cursor for that connection. - - ''' - conn = self.get_conn(name) - - try: - curs = conn.cursor() - yield curs - finally: - curs.close() - self.log_notices(conn) - self.put_conn(conn, name) - - def log_notices(self, conn): - for notice in conn.notices: - log_notices.info(notice.rstrip()) - conn.notices[:] = [] - - def wait_for_notify(self, name='default', timeout=None): - '''Wait for asynchronous notifies, return the last one. - - *name* -- name of connection, must be created using `create_conn_listen()` - - *timeout* -- in seconds, floating point (`None` means wait forever) - - Returns `None` on timeout. - - ''' - conn = self.get_conn(name) - - # return any notifies on stack - if conn.notifies: - return conn.notifies.pop() - - if select.select([conn], [], [], timeout) == ([], [], []): - # timeout - return None - else: - conn.poll() - - # return just the last notify (we do not care for older ones) - if conn.notifies: - return conn.notifies.pop() - return None - - def _connect(self, ci): - conn = psycopg2.connect(ci.dsn, connection_factory=Connection) - conn.name = ci.name - if ci.keep_alive: - conn.keep_alive() - if not ci.isolation_level is None: - conn.set_isolation_level(ci.isolation_level) - if ci.init_statement: - curs = conn.cursor() - curs.execute(ci.init_statement) - curs.connection.commit() - curs.close() - return conn - - def _normalize_isolation_level(self, level): - if type(level) == str: - if level.lower() == 'autocommit': - return psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT - if level.lower() == 'read_committed': - return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED - if level.lower() == 'serializable': - return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE - raise PgManagerError('Unknown isolation level name: "%s"' % level) - return level - - def _check_fork(self): - '''Check if process was forked (PID has changed). - - If it was, clean parent's connections. - New connections are created for children. - Known connection credentials are inherited, but not shared. - - ''' - if self.pid == multiprocessing.current_process().pid: - # PID has not changed - return - - # update saved PID - self.pid = multiprocessing.current_process().pid - # reinitialize lock - self.lock = threading.Lock() - # clean parent's connections - for name in self.conn_pool: - self.conn_pool[name] = [] - - @classmethod - def get_instance(cls): - if not hasattr(cls, '_instance'): - cls._instance = cls() - return cls._instance - - -def get_instance(): - return PgManager.get_instance() -