pgtoolkit/pgmanager.py
changeset 104 d8ff52a0390f
parent 103 24e94a3da209
child 105 10551741f61f
--- a/pgtoolkit/pgmanager.py	Mon May 26 18:18:21 2014 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,515 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# PgManager - manage database connections
-#
-# Requires: Python 3.2, psycopg2
-#
-# Part of pgtoolkit
-# http://hg.devl.cz/pgtoolkit
-#
-# Copyright (c) 2010, 2011, 2012, 2013  Radek Brich <radek.brich@devl.cz>
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-"""Postgres database connection manager
-
-PgManager wraps psycopg2, adding following features:
-
- * Save and reuse database connection parameters
-
- * Connection pooling
-
- * Easy query using the with statement
-
- * Dictionary rows
-
-Example usage:
-
-    from pgtoolkit import pgmanager
-
-    pgm = pgmanager.get_instance()
-    pgm.create_conn(hostaddr='127.0.0.1', dbname='postgres')
-
-    with pgm.cursor() as curs:
-        curs.execute('SELECT now() AS now')
-        row = curs.fetchone_dict()
-        print(row.now)
-
-First, we have obtained PgManager instance. This is like calling
-PgManager(), although in our example the instance is global. That means
-getting the instance in another module brings us all the defined connections
-etc.
-
-On second line we have created connection named 'default' (this name can be left out).
-The with statement obtains connection (actually connects to database when needed),
-then returns cursor for this connection. At the end of with statement,
-the connection is returned to the pool or closed (depending on number of connections
-in pool and on setting of pool_size parameter).
-
-The row returned by fetchone_dict() is special dict object, which can be accessed
-using item or attribute access, that is row['now'] or row.now.
-
-"""
-
-from contextlib import contextmanager
-from collections import OrderedDict
-import logging
-import threading
-import multiprocessing
-import select
-import socket
-
-import psycopg2
-import psycopg2.extensions
-
-from psycopg2 import DatabaseError, IntegrityError, OperationalError
-
-
-log_sql = logging.getLogger("pgmanager_sql")
-log_notices = logging.getLogger("pgmanager_notices")
-log_sql.addHandler(logging.NullHandler())
-# NullHandler not needed for notices which are INFO level only
-
-
-class PgManagerError(Exception):
-
-    pass
-
-
-class ConnectionInfo:
-
-    def __init__(self, name, dsn, isolation_level=None, keep_alive=True,
-                 init_statement=None, pool_size=1):
-        self.name = name  # connection name is logged with SQL queries
-        self.dsn = dsn  # dsn or string with connection parameters
-        self.isolation_level = isolation_level
-        self.keep_alive = keep_alive
-        self.init_statement = init_statement
-        self.pool_size = pool_size
-
-
-class RowDict(OrderedDict):
-    """Special dictionary used for rows returned from queries.
-
-    Items keep order in which columns where returned from database.
-
-    It supports three styles of access:
-
-        Dict style:
-            row['id']
-            for key in row:
-                ...
-
-        Object style (only works if column name does not collide with any method name):
-            row.id
-
-        Tuple style:
-            row[0]
-            id, name = row.values()
-
-    """
-
-    def __getitem__(self, key):
-        if isinstance(key, int):
-            return tuple(self.values())[key]
-        else:
-            return OrderedDict.__getitem__(self, key)
-
-    def __getattr__(self, key):
-        try:
-            return self[key]
-        except KeyError:
-            raise AttributeError(key)
-
-
-class Cursor(psycopg2.extensions.cursor):
-
-    def execute(self, query, args=None):
-        # log query before executing
-        self._log_query(query, args)
-        try:
-            return super(Cursor, self).execute(query, args)
-        except DatabaseError:
-            self._log_exception()
-            raise
-
-    def callproc(self, procname, args=None):
-        # log query before executing (not query actually executed but should correspond)
-        self._log_query(self._build_callproc_query(procname, len(args)), args)
-        try:
-            return super(Cursor, self).callproc(procname, args)
-        except DatabaseError:
-            self._log_exception()
-            raise
-
-    def row_dict(self, row, lstrip=None):
-        adjustname = lambda a: a
-        if lstrip:
-            adjustname = lambda a: a.lstrip(lstrip)
-        return RowDict(zip([adjustname(desc[0]) for desc in self.description], row))
-
-    def fetchone_dict(self, lstrip=None):
-        '''Return one row as OrderedDict'''
-        row = super(Cursor, self).fetchone()
-        if row is None:
-            return None
-        return self.row_dict(row, lstrip)
-
-    def fetchall_dict(self, lstrip=None):
-        '''Return all rows as OrderedDict'''
-        rows = super(Cursor, self).fetchall()
-        return [self.row_dict(row, lstrip) for row in rows]
-
-    def adapt(self, row):
-        if isinstance(row, RowDict):
-            # dict
-            adapted = dict()
-            for key in row.keys():
-                adapted[key] = self.mogrify('%s', [row[key]]).decode('utf8')
-            return RowDict(adapted)
-        else:
-            # list
-            return [self.mogrify('%s', [x]).decode('utf8') for x in row]
-
-    def fetchone_adapted(self, lstrip=None):
-        '''Like fetchone_dict() but values are quoted for direct inclusion in SQL query.
-
-        This is useful when you need to generate SQL script from data returned
-        by the query. Use mogrify() for simple cases.
-
-        '''
-        row = super(Cursor, self).fetchone()
-        if row is None:
-            return None
-        return self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip)
-
-    def fetchall_adapted(self, lstrip=None):
-        '''Like fetchall_dict() but values are quoted for direct inclusion in SQL query.'''
-        rows = super(Cursor, self).fetchall()
-        return [self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip) for row in rows]
-
-    def _log_query(self, query='?', args=None):
-        name = self.connection.name if hasattr(self.connection, 'name') else '-'
-        query = self.mogrify(query, args)
-        log_sql.debug('[%s] %s' % (name, query.decode('utf8')))
-
-    def _log_exception(self):
-        name = self.connection.name if hasattr(self.connection, 'name') else '-'
-        log_sql.exception('[%s] exception:' % (name,))
-
-    def _build_callproc_query(self, procname, num_args):
-        return 'SELECT * FROM %s(%s)' % (procname, ', '.join(['%s'] * num_args))
-
-
-class Connection(psycopg2.extensions.connection):
-
-    def cursor(self, name=None):
-        if name is None:
-            return super(Connection, self).cursor(cursor_factory=Cursor)
-        else:
-            return super(Connection, self).cursor(name, cursor_factory=Cursor)
-
-    def keep_alive(self):
-        '''Set socket to keepalive mode. Must be called before any query.'''
-        sock = socket.fromfd(self.fileno(), socket.AF_INET, socket.SOCK_STREAM)
-        sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
-        try:
-            # Maximum keep-alive probes before asuming the connection is lost
-            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
-            # Interval (in seconds) between keep-alive probes
-            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 2)
-            # Maximum idle time (in seconds) before start sending keep-alive probes
-            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10)
-        except socket.error:
-            pass
-        # close duplicated fd, options set for socket stays
-        sock.close()
-
-
-class PgManager:
-
-    def __init__(self):
-        self.conn_known = {}  # available connections
-        self.conn_pool = {}  # active connetions
-        self.lock = threading.Lock()  # mutual exclusion for threads
-        self.pid = multiprocessing.current_process().pid  # forking check
-
-    def __del__(self):
-        for conn in tuple(self.conn_known.keys()):
-            self.destroy_conn(conn)
-
-    def create_conn(self, name='default', isolation_level=None, keep_alive=True, init_statement=None,
-                    pool_size=1, dsn=None, **kwargs):
-        '''Create named connection.
-
-        *name* -- name for connection
-
-        *pool_size* -- how many connections will be kept open in pool.
-        More connections will still be created but they will be closed by put_conn.
-        `None` will disable pool, get_conn() will then always return same connection.
-
-        *isolation_level* -- `"autocommit"`, `"read_committed"`, `"serializable"` or `None` for driver default
-
-        *keep_alive* -- set socket to keepalive mode
-
-        *dsn* -- connection string (parameters or data source name)
-
-        Other keyword args are used as connection parameters.
-
-        '''
-        if name in self.conn_known:
-            raise PgManagerError('Connection name "%s" already registered.' % name)
-
-        if dsn is None:
-            dsn = ' '.join([x[0]+'='+str(x[1]) for x in kwargs.items() if x[1] is not None])
-
-        isolation_level = self._normalize_isolation_level(isolation_level)
-        ci = ConnectionInfo(name, dsn, isolation_level, keep_alive, init_statement, pool_size)
-
-        self.conn_known[name] = ci
-        self.conn_pool[name] = []
-
-    def create_conn_listen(self, name, channel, dsn=None, copy_dsn=None, **kwargs):
-        '''Create connection listening for notifies.
-
-        Disables pool. If you want to use pool, create other connection for that.
-        This connection can be used as usual: conn.cursor() etc.
-        Don't use PgManager's cursor() and put_conn().
-
-        *name* -- name for connection
-
-        *channel* -- listen on this channel
-
-        *copy_dsn* -- specify name of other connection and its dsn will be used
-
-        Other parameters forwarded to create_conn().
-
-        '''
-        if dsn is None and copy_dsn:
-            try:
-                dsn = self.conn_known[copy_dsn].dsn
-            except KeyError:
-                raise PgManagerError("Connection name '%s' not registered." % copy_dsn)
-        listen_query = "LISTEN " + channel
-        self.create_conn(name=name, pool_size=None, isolation_level='autocommit', init_statement=listen_query,
-            dsn=dsn, **kwargs)
-
-    def close_conn(self, name='default'):
-        '''Close all connections of given name.
-
-        Connection credentials are still saved.
-
-        '''
-        while len(self.conn_pool[name]):
-            conn = self.conn_pool[name].pop()
-            conn.close()
-
-    def destroy_conn(self, name='default'):
-        '''Destroy connection.
-
-        Counterpart of create_conn.
-
-        '''
-        if not name in self.conn_known:
-            raise PgManagerError('Connection name "%s" not registered.' % name)
-
-        self.close_conn(name)
-
-        del self.conn_known[name]
-        del self.conn_pool[name]
-
-    def knows_conn(self, name='default'):
-        return name in self.conn_known
-
-    def get_conn(self, name='default'):
-        '''Get connection of name 'name' from pool.'''
-        self._check_fork()
-        self.lock.acquire()
-        try:
-            try:
-                ci = self.conn_known[name]
-            except KeyError:
-                raise PgManagerError("Connection name '%s' not registered." % name)
-
-            # no pool, just one static connection
-            if ci.pool_size is None:
-                # check for existing connection
-                try:
-                    conn = self.conn_pool[name][0]
-                    if conn.closed:
-                        conn = None
-                except IndexError:
-                    conn = None
-                    self.conn_pool[name].append(conn)
-                # if no existing connection is valid, connect new one and save it
-                if conn is None:
-                    conn = self._connect(ci)
-                    self.conn_pool[name][0] = conn
-
-            # connection from pool
-            else:
-                conn = None
-                while len(self.conn_pool[name]) and conn is None:
-                    conn = self.conn_pool[name].pop()
-                    if conn.closed:
-                        conn = None
-
-                if conn is None:
-                    conn = self._connect(ci)
-        finally:
-            self.lock.release()
-        return conn
-
-    def put_conn(self, conn, name='default'):
-        '''Put connection back to pool.
-
-        *name* must be same as used for get_conn, otherwise things become broken.
-
-        '''
-        self.lock.acquire()
-        try:
-            if not name in self.conn_known:
-                raise PgManagerError("Connection name '%s' not registered." % name)
-
-            if len(self.conn_pool[name]) >= self.conn_known[name].pool_size:
-                conn.close()
-                return
-
-            if conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN:
-                conn.close()
-                return
-
-            # connection returned to the pool must not be in transaction
-            if conn.get_transaction_status() != psycopg2.extensions.TRANSACTION_STATUS_IDLE:
-                try:
-                    conn.rollback()
-                except OperationalError:
-                    if not conn.closed:
-                        conn.close()
-                    return
-
-            self.conn_pool[name].append(conn)
-        finally:
-            self.lock.release()
-
-    @contextmanager
-    def cursor(self, name='default'):
-        '''Cursor context.
-
-        Uses any connection info with *name* from pool
-        and returns cursor for that connection.
-
-        '''
-        conn = self.get_conn(name)
-
-        try:
-            curs = conn.cursor()
-            yield curs
-        finally:
-            curs.close()
-            self.log_notices(conn)
-            self.put_conn(conn, name)
-
-    def log_notices(self, conn):
-        for notice in conn.notices:
-            log_notices.info(notice.rstrip())
-        conn.notices[:] = []
-
-    def wait_for_notify(self, name='default', timeout=None):
-        '''Wait for asynchronous notifies, return the last one.
-
-        *name* -- name of connection, must be created using `create_conn_listen()`
-
-        *timeout* -- in seconds, floating point (`None` means wait forever)
-
-        Returns `None` on timeout.
-
-        '''
-        conn = self.get_conn(name)
-
-        # return any notifies on stack
-        if conn.notifies:
-            return conn.notifies.pop()
-
-        if select.select([conn], [], [], timeout) == ([], [], []):
-            # timeout
-            return None
-        else:
-            conn.poll()
-
-            # return just the last notify (we do not care for older ones)
-            if conn.notifies:
-                return conn.notifies.pop()
-            return None
-
-    def _connect(self, ci):
-        conn = psycopg2.connect(ci.dsn, connection_factory=Connection)
-        conn.name = ci.name
-        if ci.keep_alive:
-            conn.keep_alive()
-        if not ci.isolation_level is None:
-            conn.set_isolation_level(ci.isolation_level)
-        if ci.init_statement:
-            curs = conn.cursor()
-            curs.execute(ci.init_statement)
-            curs.connection.commit()
-            curs.close()
-        return conn
-
-    def _normalize_isolation_level(self, level):
-        if type(level) == str:
-            if level.lower() == 'autocommit':
-                return psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
-            if level.lower() == 'read_committed':
-                return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED
-            if level.lower() == 'serializable':
-                return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
-            raise PgManagerError('Unknown isolation level name: "%s"' % level)
-        return level
-
-    def _check_fork(self):
-        '''Check if process was forked (PID has changed).
-
-        If it was, clean parent's connections.
-        New connections are created for children.
-        Known connection credentials are inherited, but not shared.
-
-        '''
-        if self.pid == multiprocessing.current_process().pid:
-            # PID has not changed
-            return
-
-        # update saved PID
-        self.pid = multiprocessing.current_process().pid
-        # reinitialize lock
-        self.lock = threading.Lock()
-        # clean parent's connections
-        for name in self.conn_pool:
-            self.conn_pool[name] = []
-
-    @classmethod
-    def get_instance(cls):
-        if not hasattr(cls, '_instance'):
-            cls._instance = cls()
-        return cls._instance
-
-
-def get_instance():
-    return PgManager.get_instance()
-