mytoolkit/mymanager.py
changeset 15 93450b43e627
child 16 cb7e13711a99
equal deleted inserted replaced
14:a900bc629ecc 15:93450b43e627
       
     1 # -*- coding: utf-8 -*-
       
     2 #
       
     3 # MyManager - manage database connections (MySQL version)
       
     4 #
       
     5 # Requires: Python 2.6, MySQLdb
       
     6 #
       
     7 # Part of pgtoolkit
       
     8 # http://hg.devl.cz/pgtoolkit
       
     9 #
       
    10 # Copyright (c) 2011  Radek Brich <radek.brich@devl.cz>
       
    11 #
       
    12 # Permission is hereby granted, free of charge, to any person obtaining a copy
       
    13 # of this software and associated documentation files (the "Software"), to deal
       
    14 # in the Software without restriction, including without limitation the rights
       
    15 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
       
    16 # copies of the Software, and to permit persons to whom the Software is
       
    17 # furnished to do so, subject to the following conditions:
       
    18 #
       
    19 # The above copyright notice and this permission notice shall be included in
       
    20 # all copies or substantial portions of the Software.
       
    21 #
       
    22 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
       
    23 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
       
    24 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
       
    25 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
       
    26 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
       
    27 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
       
    28 # THE SOFTWARE.
       
    29 
       
    30 """MySQL database connection manager
       
    31 
       
    32 MyManager wraps MySQLdb connect function, adding following features:
       
    33 
       
    34  * Manage database connection parameters - link connection parameters
       
    35    to an unique identifier, retrieve connection object by this identifier
       
    36 
       
    37  * Connection pooling - connections with same identifier are pooled and reused
       
    38 
       
    39  * Easy query using the with statement - retrieve cursor directly by connection
       
    40    identifier, don't worry about connections
       
    41 
       
    42  * Dict rows - cursor has additional methods like fetchall_dict(), which
       
    43    returns dict row instead of ordinary list-like row
       
    44 
       
    45 Example:
       
    46 
       
    47 import mymanager
       
    48 
       
    49 db = mymanager.get_instance()
       
    50 db.create_conn(host='127.0.0.1', db='default')
       
    51 
       
    52 with db.cursor() as curs:
       
    53     curs.execute('SELECT now() AS now')
       
    54     row = curs.fetchone_dict()
       
    55     print row.now
       
    56 
       
    57 First, we have obtained MyManager instance. This is like calling
       
    58 MyManager(), although in our example the instance is global. That means
       
    59 getting the instance in another module brings us all the defined connections
       
    60 etc.
       
    61 
       
    62 On next line we have created connection named 'default' (this name can be left out).
       
    63 The with statement obtains connection (actually connects to database when needed),
       
    64 then returns cursor for this connection. At the end of with statement,
       
    65 the connection is returned to the pool or closed (depending on number of connections
       
    66 in pool and on setting of keep_open parameter).
       
    67 
       
    68 The row returned by fetchone_dict() is special dict object, which can be accessed
       
    69 using item or attribute access, that is row['now'] or row.now.
       
    70 """
       
    71 
       
    72 from contextlib import contextmanager
       
    73 import logging
       
    74 import threading
       
    75 import select
       
    76 import socket
       
    77 
       
    78 import MySQLdb
       
    79 import MySQLdb.cursors
       
    80 
       
    81 from MySQLdb import DatabaseError, IntegrityError, OperationalError
       
    82 
       
    83 
       
    84 class MyManagerError(Exception):
       
    85 
       
    86     pass
       
    87 
       
    88 
       
    89 class ConnectionInfo:
       
    90 
       
    91     def __init__(self, isolation_level=None, init_statement=None, keep_open=1, **kw):
       
    92         self.isolation_level = isolation_level
       
    93         self.init_statement = init_statement
       
    94         self.keep_open = keep_open
       
    95         self.parameters = kw
       
    96 
       
    97 
       
    98 class RowDict(dict):
       
    99 
       
   100     def __getattr__(self, key):
       
   101         return self[key]
       
   102 
       
   103 
       
   104 class Cursor(MySQLdb.cursors.Cursor):
       
   105 
       
   106     def execute(self, query, args=None):
       
   107         try:
       
   108             return super(Cursor, self).execute(query, args)
       
   109         finally:
       
   110             log.debug(self._executed)
       
   111 
       
   112     def callproc(self, procname, args=None):
       
   113         try:
       
   114             return super(Cursor, self).callproc(procname, args)
       
   115         finally:
       
   116             log.debug(self._executed)
       
   117 
       
   118     def row_dict(self, row, lstrip=None):
       
   119         adjustname = lambda a: a
       
   120         if lstrip:
       
   121             adjustname = lambda a: a.lstrip(lstrip)
       
   122         return RowDict(zip([adjustname(desc[0]) for desc in self.description], row))
       
   123 
       
   124     def fetchone_dict(self, lstrip=None):
       
   125         row = super(Cursor, self).fetchone()
       
   126         if row is None:
       
   127             return None
       
   128         return self.row_dict(row, lstrip)
       
   129 
       
   130     def fetchall_dict(self, lstrip=None):
       
   131         rows = super(Cursor, self).fetchall()
       
   132         return [self.row_dict(row, lstrip) for row in rows]
       
   133 
       
   134 
       
   135 class MyManager:
       
   136 
       
   137     def __init__(self):
       
   138         self.conn_known = {}  # available connections
       
   139         self.conn_pool = {}
       
   140         self.lock = threading.Lock()
       
   141 
       
   142     def __del__(self):
       
   143         for conn in tuple(self.conn_known.keys()):
       
   144             self.destroy_conn(conn)
       
   145 
       
   146     def create_conn(self, name='default', isolation_level=None, dsn=None, **kw):
       
   147         '''Create named connection.'''
       
   148         if name in self.conn_known:
       
   149             raise MyManagerError('Connection name "%s" already registered.' % name)
       
   150 
       
   151         isolation_level = self._normalize_isolation_level(isolation_level)
       
   152         ci = ConnectionInfo(isolation_level, **kw)
       
   153 
       
   154         self.conn_known[name] = ci
       
   155         self.conn_pool[name] = []
       
   156 
       
   157     def close_conn(self, name='default'):
       
   158         '''Close all connections of given name.
       
   159         
       
   160         Connection credentials are still saved.
       
   161         
       
   162         '''
       
   163         while len(self.conn_pool[name]):
       
   164             conn = self.conn_pool[name].pop()
       
   165             conn.close()
       
   166 
       
   167     def destroy_conn(self, name='default'):
       
   168         '''Destroy connection.
       
   169         
       
   170         Counterpart of create_conn.
       
   171         
       
   172         '''
       
   173         if not name in self.conn_known:
       
   174             raise MyManagerError('Connection name "%s" not registered.' % name)
       
   175 
       
   176         self.close_conn(name)
       
   177 
       
   178         del self.conn_known[name]
       
   179         del self.conn_pool[name]
       
   180 
       
   181     def get_conn(self, name='default'):
       
   182         '''Get connection of name 'name' from pool.'''
       
   183         self.lock.acquire()
       
   184         try:
       
   185             if not name in self.conn_known:
       
   186                 raise MyManagerError("Connection name '%s' not registered." % name)
       
   187 
       
   188             conn = None
       
   189             while len(self.conn_pool[name]) and conn is None:
       
   190                 conn = self.conn_pool[name].pop()
       
   191                 if conn.closed:
       
   192                     conn = None
       
   193 
       
   194             if conn is None:
       
   195                 ci = self.conn_known[name]
       
   196                 conn = self._connect(ci)
       
   197         finally:
       
   198             self.lock.release()
       
   199         return conn
       
   200 
       
   201     def put_conn(self, conn, name='default'):
       
   202         '''Put connection back to pool.
       
   203         
       
   204         Name must be same as used for get_conn,
       
   205         otherwise things become broken.
       
   206         
       
   207         '''
       
   208         self.lock.acquire()
       
   209         try:
       
   210             if not name in self.conn_known:
       
   211                 raise MyManagerError("Connection name '%s' not registered." % name)
       
   212 
       
   213             if len(self.conn_pool[name]) >= self.conn_known[name].keep_open:
       
   214                 conn.close()
       
   215                 return
       
   216 
       
   217             # connection returned to the pool must not be in transaction
       
   218             conn.rollback()
       
   219 
       
   220             self.conn_pool[name].append(conn)
       
   221         finally:
       
   222             self.lock.release()
       
   223 
       
   224     @contextmanager
       
   225     def cursor(self, name='default'):
       
   226         '''Cursor context.
       
   227         
       
   228         Uses any connection of name 'name' from pool
       
   229         and returns cursor for that connection.
       
   230         
       
   231         '''
       
   232         conn = self.get_conn(name)
       
   233 
       
   234         try:
       
   235             curs = conn.cursor()
       
   236             yield curs
       
   237         finally:
       
   238             curs.close()
       
   239             self.put_conn(conn, name)
       
   240 
       
   241     def _connect(self, ci):
       
   242         conn = MySQLdb.connect(cursorclass=Cursor, **ci.parameters)
       
   243         if not ci.isolation_level is None:
       
   244             if ci.isolation_level == 'AUTOCOMMIT':
       
   245                 conn.autocommit()
       
   246             else:
       
   247                 curs = conn.cursor()
       
   248                 curs.execute('SET SESSION TRANSACTION ISOLATION LEVEL ' + ci.isolation_level)
       
   249                 curs.close()
       
   250         if ci.init_statement:
       
   251             curs = conn.cursor()
       
   252             curs.execute(ci.init_statement)
       
   253             curs.close()
       
   254         return conn
       
   255 
       
   256     def _normalize_isolation_level(self, level):
       
   257         if level is None:
       
   258             return level
       
   259         if type(level) == str:
       
   260             level = level.upper().replace('_', ' ')
       
   261             if level in (
       
   262                 'AUTOCOMMIT',
       
   263                 'READ UNCOMMITTED',
       
   264                 'READ COMMITTED',
       
   265                 'REPEATABLE READ',
       
   266                 'SERIALIZABLE'):
       
   267                 return level
       
   268         raise MyManagerError('Unknown isolation level name: "%s"', level)
       
   269 
       
   270 
       
   271 try:
       
   272     NullHandler = logging.NullHandler
       
   273 except AttributeError:
       
   274     class NullHandler(logging.Handler):
       
   275         def emit(self, record):
       
   276             pass
       
   277 
       
   278 
       
   279 log = logging.getLogger("mymanager")
       
   280 log.addHandler(NullHandler())
       
   281 
       
   282 
       
   283 instance = None
       
   284 
       
   285 
       
   286 def get_instance():
       
   287     global instance
       
   288     if instance is None:
       
   289         instance = MyManager()
       
   290     return instance
       
   291 
       
   292