--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/pgmanager.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,515 @@
+# -*- coding: utf-8 -*-
+#
+# PgManager - manage database connections
+#
+# Requires: Python 3.2, psycopg2
+#
+# Part of pydbkit
+# http://hg.devl.cz/pydbkit
+#
+# Copyright (c) 2010, 2011, 2012, 2013 Radek Brich <radek.brich@devl.cz>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+"""Postgres database connection manager
+
+PgManager wraps psycopg2, adding following features:
+
+ * Save and reuse database connection parameters
+
+ * Connection pooling
+
+ * Easy query using the with statement
+
+ * Dictionary rows
+
+Example usage:
+
+ from pydbkit import pgmanager
+
+ pgm = pgmanager.get_instance()
+ pgm.create_conn(hostaddr='127.0.0.1', dbname='postgres')
+
+ with pgm.cursor() as curs:
+ curs.execute('SELECT now() AS now')
+ row = curs.fetchone_dict()
+ print(row.now)
+
+First, we have obtained PgManager instance. This is like calling
+PgManager(), although in our example the instance is global. That means
+getting the instance in another module brings us all the defined connections
+etc.
+
+On second line we have created connection named 'default' (this name can be left out).
+The with statement obtains connection (actually connects to database when needed),
+then returns cursor for this connection. At the end of with statement,
+the connection is returned to the pool or closed (depending on number of connections
+in pool and on setting of pool_size parameter).
+
+The row returned by fetchone_dict() is special dict object, which can be accessed
+using item or attribute access, that is row['now'] or row.now.
+
+"""
+
+from contextlib import contextmanager
+from collections import OrderedDict
+import logging
+import threading
+import multiprocessing
+import select
+import socket
+
+import psycopg2
+import psycopg2.extensions
+
+from psycopg2 import DatabaseError, IntegrityError, OperationalError
+
+
+log_sql = logging.getLogger("pgmanager_sql")
+log_notices = logging.getLogger("pgmanager_notices")
+log_sql.addHandler(logging.NullHandler())
+# NullHandler not needed for notices which are INFO level only
+
+
+class PgManagerError(Exception):
+
+ pass
+
+
+class ConnectionInfo:
+
+ def __init__(self, name, dsn, isolation_level=None, keep_alive=True,
+ init_statement=None, pool_size=1):
+ self.name = name # connection name is logged with SQL queries
+ self.dsn = dsn # dsn or string with connection parameters
+ self.isolation_level = isolation_level
+ self.keep_alive = keep_alive
+ self.init_statement = init_statement
+ self.pool_size = pool_size
+
+
+class RowDict(OrderedDict):
+ """Special dictionary used for rows returned from queries.
+
+ Items keep order in which columns where returned from database.
+
+ It supports three styles of access:
+
+ Dict style:
+ row['id']
+ for key in row:
+ ...
+
+ Object style (only works if column name does not collide with any method name):
+ row.id
+
+ Tuple style:
+ row[0]
+ id, name = row.values()
+
+ """
+
+ def __getitem__(self, key):
+ if isinstance(key, int):
+ return tuple(self.values())[key]
+ else:
+ return OrderedDict.__getitem__(self, key)
+
+ def __getattr__(self, key):
+ try:
+ return self[key]
+ except KeyError:
+ raise AttributeError(key)
+
+
+class Cursor(psycopg2.extensions.cursor):
+
+ def execute(self, query, args=None):
+ # log query before executing
+ self._log_query(query, args)
+ try:
+ return super(Cursor, self).execute(query, args)
+ except DatabaseError:
+ self._log_exception()
+ raise
+
+ def callproc(self, procname, args=None):
+ # log query before executing (not query actually executed but should correspond)
+ self._log_query(self._build_callproc_query(procname, len(args)), args)
+ try:
+ return super(Cursor, self).callproc(procname, args)
+ except DatabaseError:
+ self._log_exception()
+ raise
+
+ def row_dict(self, row, lstrip=None):
+ adjustname = lambda a: a
+ if lstrip:
+ adjustname = lambda a: a.lstrip(lstrip)
+ return RowDict(zip([adjustname(desc[0]) for desc in self.description], row))
+
+ def fetchone_dict(self, lstrip=None):
+ '''Return one row as OrderedDict'''
+ row = super(Cursor, self).fetchone()
+ if row is None:
+ return None
+ return self.row_dict(row, lstrip)
+
+ def fetchall_dict(self, lstrip=None):
+ '''Return all rows as OrderedDict'''
+ rows = super(Cursor, self).fetchall()
+ return [self.row_dict(row, lstrip) for row in rows]
+
+ def adapt(self, row):
+ if isinstance(row, RowDict):
+ # dict
+ adapted = dict()
+ for key in row.keys():
+ adapted[key] = self.mogrify('%s', [row[key]]).decode('utf8')
+ return RowDict(adapted)
+ else:
+ # list
+ return [self.mogrify('%s', [x]).decode('utf8') for x in row]
+
+ def fetchone_adapted(self, lstrip=None):
+ '''Like fetchone_dict() but values are quoted for direct inclusion in SQL query.
+
+ This is useful when you need to generate SQL script from data returned
+ by the query. Use mogrify() for simple cases.
+
+ '''
+ row = super(Cursor, self).fetchone()
+ if row is None:
+ return None
+ return self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip)
+
+ def fetchall_adapted(self, lstrip=None):
+ '''Like fetchall_dict() but values are quoted for direct inclusion in SQL query.'''
+ rows = super(Cursor, self).fetchall()
+ return [self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip) for row in rows]
+
+ def _log_query(self, query='?', args=None):
+ name = self.connection.name if hasattr(self.connection, 'name') else '-'
+ query = self.mogrify(query, args)
+ log_sql.debug('[%s] %s' % (name, query.decode('utf8')))
+
+ def _log_exception(self):
+ name = self.connection.name if hasattr(self.connection, 'name') else '-'
+ log_sql.exception('[%s] exception:' % (name,))
+
+ def _build_callproc_query(self, procname, num_args):
+ return 'SELECT * FROM %s(%s)' % (procname, ', '.join(['%s'] * num_args))
+
+
+class Connection(psycopg2.extensions.connection):
+
+ def cursor(self, name=None):
+ if name is None:
+ return super(Connection, self).cursor(cursor_factory=Cursor)
+ else:
+ return super(Connection, self).cursor(name, cursor_factory=Cursor)
+
+ def keep_alive(self):
+ '''Set socket to keepalive mode. Must be called before any query.'''
+ sock = socket.fromfd(self.fileno(), socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+ try:
+ # Maximum keep-alive probes before asuming the connection is lost
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
+ # Interval (in seconds) between keep-alive probes
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 2)
+ # Maximum idle time (in seconds) before start sending keep-alive probes
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10)
+ except socket.error:
+ pass
+ # close duplicated fd, options set for socket stays
+ sock.close()
+
+
+class PgManager:
+
+ def __init__(self):
+ self.conn_known = {} # available connections
+ self.conn_pool = {} # active connetions
+ self.lock = threading.Lock() # mutual exclusion for threads
+ self.pid = multiprocessing.current_process().pid # forking check
+
+ def __del__(self):
+ for conn in tuple(self.conn_known.keys()):
+ self.destroy_conn(conn)
+
+ def create_conn(self, name='default', isolation_level=None, keep_alive=True, init_statement=None,
+ pool_size=1, dsn=None, **kwargs):
+ '''Create named connection.
+
+ *name* -- name for connection
+
+ *pool_size* -- how many connections will be kept open in pool.
+ More connections will still be created but they will be closed by put_conn.
+ `None` will disable pool, get_conn() will then always return same connection.
+
+ *isolation_level* -- `"autocommit"`, `"read_committed"`, `"serializable"` or `None` for driver default
+
+ *keep_alive* -- set socket to keepalive mode
+
+ *dsn* -- connection string (parameters or data source name)
+
+ Other keyword args are used as connection parameters.
+
+ '''
+ if name in self.conn_known:
+ raise PgManagerError('Connection name "%s" already registered.' % name)
+
+ if dsn is None:
+ dsn = ' '.join([x[0]+'='+str(x[1]) for x in kwargs.items() if x[1] is not None])
+
+ isolation_level = self._normalize_isolation_level(isolation_level)
+ ci = ConnectionInfo(name, dsn, isolation_level, keep_alive, init_statement, pool_size)
+
+ self.conn_known[name] = ci
+ self.conn_pool[name] = []
+
+ def create_conn_listen(self, name, channel, dsn=None, copy_dsn=None, **kwargs):
+ '''Create connection listening for notifies.
+
+ Disables pool. If you want to use pool, create other connection for that.
+ This connection can be used as usual: conn.cursor() etc.
+ Don't use PgManager's cursor() and put_conn().
+
+ *name* -- name for connection
+
+ *channel* -- listen on this channel
+
+ *copy_dsn* -- specify name of other connection and its dsn will be used
+
+ Other parameters forwarded to create_conn().
+
+ '''
+ if dsn is None and copy_dsn:
+ try:
+ dsn = self.conn_known[copy_dsn].dsn
+ except KeyError:
+ raise PgManagerError("Connection name '%s' not registered." % copy_dsn)
+ listen_query = "LISTEN " + channel
+ self.create_conn(name=name, pool_size=None, isolation_level='autocommit', init_statement=listen_query,
+ dsn=dsn, **kwargs)
+
+ def close_conn(self, name='default'):
+ '''Close all connections of given name.
+
+ Connection credentials are still saved.
+
+ '''
+ while len(self.conn_pool[name]):
+ conn = self.conn_pool[name].pop()
+ conn.close()
+
+ def destroy_conn(self, name='default'):
+ '''Destroy connection.
+
+ Counterpart of create_conn.
+
+ '''
+ if not name in self.conn_known:
+ raise PgManagerError('Connection name "%s" not registered.' % name)
+
+ self.close_conn(name)
+
+ del self.conn_known[name]
+ del self.conn_pool[name]
+
+ def knows_conn(self, name='default'):
+ return name in self.conn_known
+
+ def get_conn(self, name='default'):
+ '''Get connection of name 'name' from pool.'''
+ self._check_fork()
+ self.lock.acquire()
+ try:
+ try:
+ ci = self.conn_known[name]
+ except KeyError:
+ raise PgManagerError("Connection name '%s' not registered." % name)
+
+ # no pool, just one static connection
+ if ci.pool_size is None:
+ # check for existing connection
+ try:
+ conn = self.conn_pool[name][0]
+ if conn.closed:
+ conn = None
+ except IndexError:
+ conn = None
+ self.conn_pool[name].append(conn)
+ # if no existing connection is valid, connect new one and save it
+ if conn is None:
+ conn = self._connect(ci)
+ self.conn_pool[name][0] = conn
+
+ # connection from pool
+ else:
+ conn = None
+ while len(self.conn_pool[name]) and conn is None:
+ conn = self.conn_pool[name].pop()
+ if conn.closed:
+ conn = None
+
+ if conn is None:
+ conn = self._connect(ci)
+ finally:
+ self.lock.release()
+ return conn
+
+ def put_conn(self, conn, name='default'):
+ '''Put connection back to pool.
+
+ *name* must be same as used for get_conn, otherwise things become broken.
+
+ '''
+ self.lock.acquire()
+ try:
+ if not name in self.conn_known:
+ raise PgManagerError("Connection name '%s' not registered." % name)
+
+ if len(self.conn_pool[name]) >= self.conn_known[name].pool_size:
+ conn.close()
+ return
+
+ if conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN:
+ conn.close()
+ return
+
+ # connection returned to the pool must not be in transaction
+ if conn.get_transaction_status() != psycopg2.extensions.TRANSACTION_STATUS_IDLE:
+ try:
+ conn.rollback()
+ except OperationalError:
+ if not conn.closed:
+ conn.close()
+ return
+
+ self.conn_pool[name].append(conn)
+ finally:
+ self.lock.release()
+
+ @contextmanager
+ def cursor(self, name='default'):
+ '''Cursor context.
+
+ Uses any connection info with *name* from pool
+ and returns cursor for that connection.
+
+ '''
+ conn = self.get_conn(name)
+
+ try:
+ curs = conn.cursor()
+ yield curs
+ finally:
+ curs.close()
+ self.log_notices(conn)
+ self.put_conn(conn, name)
+
+ def log_notices(self, conn):
+ for notice in conn.notices:
+ log_notices.info(notice.rstrip())
+ conn.notices[:] = []
+
+ def wait_for_notify(self, name='default', timeout=None):
+ '''Wait for asynchronous notifies, return the last one.
+
+ *name* -- name of connection, must be created using `create_conn_listen()`
+
+ *timeout* -- in seconds, floating point (`None` means wait forever)
+
+ Returns `None` on timeout.
+
+ '''
+ conn = self.get_conn(name)
+
+ # return any notifies on stack
+ if conn.notifies:
+ return conn.notifies.pop()
+
+ if select.select([conn], [], [], timeout) == ([], [], []):
+ # timeout
+ return None
+ else:
+ conn.poll()
+
+ # return just the last notify (we do not care for older ones)
+ if conn.notifies:
+ return conn.notifies.pop()
+ return None
+
+ def _connect(self, ci):
+ conn = psycopg2.connect(ci.dsn, connection_factory=Connection)
+ conn.name = ci.name
+ if ci.keep_alive:
+ conn.keep_alive()
+ if not ci.isolation_level is None:
+ conn.set_isolation_level(ci.isolation_level)
+ if ci.init_statement:
+ curs = conn.cursor()
+ curs.execute(ci.init_statement)
+ curs.connection.commit()
+ curs.close()
+ return conn
+
+ def _normalize_isolation_level(self, level):
+ if type(level) == str:
+ if level.lower() == 'autocommit':
+ return psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
+ if level.lower() == 'read_committed':
+ return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED
+ if level.lower() == 'serializable':
+ return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
+ raise PgManagerError('Unknown isolation level name: "%s"' % level)
+ return level
+
+ def _check_fork(self):
+ '''Check if process was forked (PID has changed).
+
+ If it was, clean parent's connections.
+ New connections are created for children.
+ Known connection credentials are inherited, but not shared.
+
+ '''
+ if self.pid == multiprocessing.current_process().pid:
+ # PID has not changed
+ return
+
+ # update saved PID
+ self.pid = multiprocessing.current_process().pid
+ # reinitialize lock
+ self.lock = threading.Lock()
+ # clean parent's connections
+ for name in self.conn_pool:
+ self.conn_pool[name] = []
+
+ @classmethod
+ def get_instance(cls):
+ if not hasattr(cls, '_instance'):
+ cls._instance = cls()
+ return cls._instance
+
+
+def get_instance():
+ return PgManager.get_instance()
+