diff -r a900bc629ecc -r 93450b43e627 mytoolkit/mymanager.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mytoolkit/mymanager.py Mon Nov 14 18:00:53 2011 +0100 @@ -0,0 +1,292 @@ +# -*- coding: utf-8 -*- +# +# MyManager - manage database connections (MySQL version) +# +# Requires: Python 2.6, MySQLdb +# +# Part of pgtoolkit +# http://hg.devl.cz/pgtoolkit +# +# Copyright (c) 2011 Radek Brich +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +"""MySQL database connection manager + +MyManager wraps MySQLdb connect function, adding following features: + + * Manage database connection parameters - link connection parameters + to an unique identifier, retrieve connection object by this identifier + + * Connection pooling - connections with same identifier are pooled and reused + + * Easy query using the with statement - retrieve cursor directly by connection + identifier, don't worry about connections + + * Dict rows - cursor has additional methods like fetchall_dict(), which + returns dict row instead of ordinary list-like row + +Example: + +import mymanager + +db = mymanager.get_instance() +db.create_conn(host='127.0.0.1', db='default') + +with db.cursor() as curs: + curs.execute('SELECT now() AS now') + row = curs.fetchone_dict() + print row.now + +First, we have obtained MyManager instance. This is like calling +MyManager(), although in our example the instance is global. That means +getting the instance in another module brings us all the defined connections +etc. + +On next line we have created connection named 'default' (this name can be left out). +The with statement obtains connection (actually connects to database when needed), +then returns cursor for this connection. At the end of with statement, +the connection is returned to the pool or closed (depending on number of connections +in pool and on setting of keep_open parameter). + +The row returned by fetchone_dict() is special dict object, which can be accessed +using item or attribute access, that is row['now'] or row.now. +""" + +from contextlib import contextmanager +import logging +import threading +import select +import socket + +import MySQLdb +import MySQLdb.cursors + +from MySQLdb import DatabaseError, IntegrityError, OperationalError + + +class MyManagerError(Exception): + + pass + + +class ConnectionInfo: + + def __init__(self, isolation_level=None, init_statement=None, keep_open=1, **kw): + self.isolation_level = isolation_level + self.init_statement = init_statement + self.keep_open = keep_open + self.parameters = kw + + +class RowDict(dict): + + def __getattr__(self, key): + return self[key] + + +class Cursor(MySQLdb.cursors.Cursor): + + def execute(self, query, args=None): + try: + return super(Cursor, self).execute(query, args) + finally: + log.debug(self._executed) + + def callproc(self, procname, args=None): + try: + return super(Cursor, self).callproc(procname, args) + finally: + log.debug(self._executed) + + def row_dict(self, row, lstrip=None): + adjustname = lambda a: a + if lstrip: + adjustname = lambda a: a.lstrip(lstrip) + return RowDict(zip([adjustname(desc[0]) for desc in self.description], row)) + + def fetchone_dict(self, lstrip=None): + row = super(Cursor, self).fetchone() + if row is None: + return None + return self.row_dict(row, lstrip) + + def fetchall_dict(self, lstrip=None): + rows = super(Cursor, self).fetchall() + return [self.row_dict(row, lstrip) for row in rows] + + +class MyManager: + + def __init__(self): + self.conn_known = {} # available connections + self.conn_pool = {} + self.lock = threading.Lock() + + def __del__(self): + for conn in tuple(self.conn_known.keys()): + self.destroy_conn(conn) + + def create_conn(self, name='default', isolation_level=None, dsn=None, **kw): + '''Create named connection.''' + if name in self.conn_known: + raise MyManagerError('Connection name "%s" already registered.' % name) + + isolation_level = self._normalize_isolation_level(isolation_level) + ci = ConnectionInfo(isolation_level, **kw) + + self.conn_known[name] = ci + self.conn_pool[name] = [] + + def close_conn(self, name='default'): + '''Close all connections of given name. + + Connection credentials are still saved. + + ''' + while len(self.conn_pool[name]): + conn = self.conn_pool[name].pop() + conn.close() + + def destroy_conn(self, name='default'): + '''Destroy connection. + + Counterpart of create_conn. + + ''' + if not name in self.conn_known: + raise MyManagerError('Connection name "%s" not registered.' % name) + + self.close_conn(name) + + del self.conn_known[name] + del self.conn_pool[name] + + def get_conn(self, name='default'): + '''Get connection of name 'name' from pool.''' + self.lock.acquire() + try: + if not name in self.conn_known: + raise MyManagerError("Connection name '%s' not registered." % name) + + conn = None + while len(self.conn_pool[name]) and conn is None: + conn = self.conn_pool[name].pop() + if conn.closed: + conn = None + + if conn is None: + ci = self.conn_known[name] + conn = self._connect(ci) + finally: + self.lock.release() + return conn + + def put_conn(self, conn, name='default'): + '''Put connection back to pool. + + Name must be same as used for get_conn, + otherwise things become broken. + + ''' + self.lock.acquire() + try: + if not name in self.conn_known: + raise MyManagerError("Connection name '%s' not registered." % name) + + if len(self.conn_pool[name]) >= self.conn_known[name].keep_open: + conn.close() + return + + # connection returned to the pool must not be in transaction + conn.rollback() + + self.conn_pool[name].append(conn) + finally: + self.lock.release() + + @contextmanager + def cursor(self, name='default'): + '''Cursor context. + + Uses any connection of name 'name' from pool + and returns cursor for that connection. + + ''' + conn = self.get_conn(name) + + try: + curs = conn.cursor() + yield curs + finally: + curs.close() + self.put_conn(conn, name) + + def _connect(self, ci): + conn = MySQLdb.connect(cursorclass=Cursor, **ci.parameters) + if not ci.isolation_level is None: + if ci.isolation_level == 'AUTOCOMMIT': + conn.autocommit() + else: + curs = conn.cursor() + curs.execute('SET SESSION TRANSACTION ISOLATION LEVEL ' + ci.isolation_level) + curs.close() + if ci.init_statement: + curs = conn.cursor() + curs.execute(ci.init_statement) + curs.close() + return conn + + def _normalize_isolation_level(self, level): + if level is None: + return level + if type(level) == str: + level = level.upper().replace('_', ' ') + if level in ( + 'AUTOCOMMIT', + 'READ UNCOMMITTED', + 'READ COMMITTED', + 'REPEATABLE READ', + 'SERIALIZABLE'): + return level + raise MyManagerError('Unknown isolation level name: "%s"', level) + + +try: + NullHandler = logging.NullHandler +except AttributeError: + class NullHandler(logging.Handler): + def emit(self, record): + pass + + +log = logging.getLogger("mymanager") +log.addHandler(NullHandler()) + + +instance = None + + +def get_instance(): + global instance + if instance is None: + instance = MyManager() + return instance + +