mytoolkit/mymanager.py
changeset 104 d8ff52a0390f
parent 103 24e94a3da209
child 105 10551741f61f
equal deleted inserted replaced
103:24e94a3da209 104:d8ff52a0390f
     1 # -*- coding: utf-8 -*-
       
     2 #
       
     3 # MyManager - manage database connections (MySQL version)
       
     4 #
       
     5 # Requires: Python 2.6 / 2.7 / 3.2, MySQLdb
       
     6 #
       
     7 # Part of pgtoolkit
       
     8 # http://hg.devl.cz/pgtoolkit
       
     9 #
       
    10 # Copyright (c) 2011, 2013  Radek Brich <radek.brich@devl.cz>
       
    11 #
       
    12 # Permission is hereby granted, free of charge, to any person obtaining a copy
       
    13 # of this software and associated documentation files (the "Software"), to deal
       
    14 # in the Software without restriction, including without limitation the rights
       
    15 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
       
    16 # copies of the Software, and to permit persons to whom the Software is
       
    17 # furnished to do so, subject to the following conditions:
       
    18 #
       
    19 # The above copyright notice and this permission notice shall be included in
       
    20 # all copies or substantial portions of the Software.
       
    21 #
       
    22 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
       
    23 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
       
    24 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
       
    25 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
       
    26 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
       
    27 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
       
    28 # THE SOFTWARE.
       
    29 
       
    30 """MySQL database connection manager
       
    31 
       
    32 MyManager wraps MySQLdb in same manner as PgManager wraps psycopg2.
       
    33 It's fully compatible so it should work as drop-in replacement for PgManager.
       
    34 
       
    35 It adds following features over MySQLdb:
       
    36 
       
    37  * Save and reuse database connection parameters
       
    38 
       
    39  * Connection pooling
       
    40 
       
    41  * Easy query using the with statement
       
    42 
       
    43  * Dictionary rows
       
    44 
       
    45 Example:
       
    46 
       
    47     from mytoolkit import mymanager
       
    48 
       
    49     dbm = mymanager.get_instance()
       
    50     dbm.create_conn(host='127.0.0.1', dbname='default')
       
    51 
       
    52     with dbm.cursor() as curs:
       
    53         curs.execute('SELECT now() AS now')
       
    54         row = curs.fetchone_dict()
       
    55         print(row.now)
       
    56 
       
    57 See PgManager docs for more information.
       
    58 
       
    59 """
       
    60 
       
    61 from contextlib import contextmanager
       
    62 from collections import OrderedDict
       
    63 import logging
       
    64 import threading
       
    65 import multiprocessing
       
    66 
       
    67 import MySQLdb
       
    68 import MySQLdb.cursors
       
    69 
       
    70 from MySQLdb import DatabaseError, IntegrityError, OperationalError
       
    71 
       
    72 from pgtoolkit.pgmanager import RowDict
       
    73 
       
    74 
       
    75 log_sql = logging.getLogger("mymanager_sql")
       
    76 log_sql.addHandler(logging.NullHandler())
       
    77 
       
    78 
       
    79 class MyManagerError(Exception):
       
    80 
       
    81     pass
       
    82 
       
    83 
       
    84 class ConnectionInfo:
       
    85 
       
    86     def __init__(self, name, isolation_level=None,
       
    87                  init_statement=None, pool_size=1, **kw):
       
    88         self.name = name  # connection name is logged with SQL queries
       
    89         self.isolation_level = isolation_level
       
    90         self.init_statement = init_statement
       
    91         self.pool_size = pool_size
       
    92         self.parameters = kw
       
    93         self.adjust_parameters()
       
    94 
       
    95     def adjust_parameters(self):
       
    96         '''Rename Postgres parameters to proper value for MySQL.'''
       
    97         m = {'dbname' : 'db', 'password' : 'passwd'}
       
    98         res = dict()
       
    99         for k, v in list(self.parameters.items()):
       
   100             if k in m:
       
   101                 k = m[k]
       
   102             res[k] = v
       
   103         self.parameters = res
       
   104 
       
   105 
       
   106 class Cursor(MySQLdb.cursors.Cursor):
       
   107 
       
   108     def execute(self, query, args=None):
       
   109         try:
       
   110             return super(Cursor, self).execute(query, args)
       
   111         finally:
       
   112             self._log_query(query, args)
       
   113 
       
   114     def callproc(self, procname, args=None):
       
   115         try:
       
   116             return super(Cursor, self).callproc(procname, args)
       
   117         finally:
       
   118             self._log_query(query, args)
       
   119 
       
   120     def row_dict(self, row, lstrip=None):
       
   121         adjustname = lambda a: a
       
   122         if lstrip:
       
   123             adjustname = lambda a: a.lstrip(lstrip)
       
   124         return RowDict(zip([adjustname(desc[0]) for desc in self.description], row))
       
   125 
       
   126     def fetchone_dict(self, lstrip=None):
       
   127         row = super(Cursor, self).fetchone()
       
   128         if row is None:
       
   129             return None
       
   130         return self.row_dict(row, lstrip)
       
   131 
       
   132     def fetchall_dict(self, lstrip=None):
       
   133         rows = super(Cursor, self).fetchall()
       
   134         return [self.row_dict(row, lstrip) for row in rows]
       
   135 
       
   136     def mogrify(self, query, args):
       
   137         """Get query with substituted args as it will be send to server."""
       
   138         if isinstance(query, bytes):
       
   139             query = query.decode()
       
   140         if args is not None:
       
   141             db = self._get_db()
       
   142             query = query % db.literal(args)
       
   143         return query
       
   144 
       
   145     def _log_query(self, query, args):
       
   146         name = self.connection.name if hasattr(self.connection, 'name') else '-'
       
   147         query = self.mogrify(query, args)
       
   148         if isinstance(query, bytes):
       
   149             db = self._get_db()
       
   150             charset = db.character_set_name()
       
   151             query = query.decode(charset)
       
   152         log_sql.debug('[%s] %s' % (name, query))
       
   153 
       
   154 
       
   155 class MyManager:
       
   156 
       
   157     def __init__(self):
       
   158         self.conn_known = {}  # available connections
       
   159         self.conn_pool = {}
       
   160         self.lock = threading.Lock()
       
   161         self.pid = multiprocessing.current_process().pid  # forking check
       
   162 
       
   163     def __del__(self):
       
   164         for conn in tuple(self.conn_known.keys()):
       
   165             self.destroy_conn(conn)
       
   166 
       
   167     def create_conn(self, name='default', isolation_level=None, **kw):
       
   168         '''Create named connection.'''
       
   169         if name in self.conn_known:
       
   170             raise MyManagerError('Connection name "%s" already registered.' % name)
       
   171 
       
   172         isolation_level = self._normalize_isolation_level(isolation_level)
       
   173         ci = ConnectionInfo(name, isolation_level, **kw)
       
   174 
       
   175         self.conn_known[name] = ci
       
   176         self.conn_pool[name] = []
       
   177 
       
   178     def close_conn(self, name='default'):
       
   179         '''Close all connections of given name.
       
   180 
       
   181         Connection credentials are still saved.
       
   182 
       
   183         '''
       
   184         while len(self.conn_pool[name]):
       
   185             conn = self.conn_pool[name].pop()
       
   186             conn.close()
       
   187 
       
   188     def destroy_conn(self, name='default'):
       
   189         '''Destroy connection.
       
   190 
       
   191         Counterpart of create_conn.
       
   192 
       
   193         '''
       
   194         if not name in self.conn_known:
       
   195             raise MyManagerError('Connection name "%s" not registered.' % name)
       
   196 
       
   197         self.close_conn(name)
       
   198 
       
   199         del self.conn_known[name]
       
   200         del self.conn_pool[name]
       
   201 
       
   202     def get_conn(self, name='default'):
       
   203         '''Get connection of name 'name' from pool.'''
       
   204         self._check_fork()
       
   205         self.lock.acquire()
       
   206         try:
       
   207             if not name in self.conn_known:
       
   208                 raise MyManagerError("Connection name '%s' not registered." % name)
       
   209 
       
   210             # connection from pool
       
   211             conn = None
       
   212             while len(self.conn_pool[name]) and conn is None:
       
   213                 conn = self.conn_pool[name].pop()
       
   214                 try:
       
   215                     conn.ping()
       
   216                 except MySQLdb.MySQLError:
       
   217                     conn.close()
       
   218                     conn = None
       
   219 
       
   220             if conn is None:
       
   221                 ci = self.conn_known[name]
       
   222                 conn = self._connect(ci)
       
   223         finally:
       
   224             self.lock.release()
       
   225         return conn
       
   226 
       
   227     def put_conn(self, conn, name='default'):
       
   228         '''Put connection back to pool.
       
   229 
       
   230         Name must be same as used for get_conn,
       
   231         otherwise things become broken.
       
   232 
       
   233         '''
       
   234         self.lock.acquire()
       
   235         try:
       
   236             if not name in self.conn_known:
       
   237                 raise MyManagerError("Connection name '%s' not registered." % name)
       
   238 
       
   239             if len(self.conn_pool[name]) >= self.conn_known[name].pool_size:
       
   240                 conn.close()
       
   241                 return
       
   242 
       
   243             # connection returned to the pool must not be in transaction
       
   244             try:
       
   245                 conn.rollback()
       
   246             except OperationalError:
       
   247                 conn.close()
       
   248                 return
       
   249 
       
   250             self.conn_pool[name].append(conn)
       
   251         finally:
       
   252             self.lock.release()
       
   253 
       
   254     @contextmanager
       
   255     def cursor(self, name='default'):
       
   256         '''Cursor context.
       
   257 
       
   258         Uses any connection of name 'name' from pool
       
   259         and returns cursor for that connection.
       
   260 
       
   261         '''
       
   262         conn = self.get_conn(name)
       
   263 
       
   264         try:
       
   265             curs = conn.cursor()
       
   266             yield curs
       
   267         finally:
       
   268             curs.close()
       
   269             self.put_conn(conn, name)
       
   270 
       
   271     def _connect(self, ci):
       
   272         conn = MySQLdb.connect(cursorclass=Cursor, **ci.parameters)
       
   273         if not ci.isolation_level is None:
       
   274             if ci.isolation_level == 'AUTOCOMMIT':
       
   275                 conn.autocommit(True)
       
   276             else:
       
   277                 curs = conn.cursor()
       
   278                 curs.execute('SET SESSION TRANSACTION ISOLATION LEVEL ' + ci.isolation_level)
       
   279                 curs.close()
       
   280         if ci.init_statement:
       
   281             curs = conn.cursor()
       
   282             curs.execute(ci.init_statement)
       
   283             curs.connection.commit()
       
   284             curs.close()
       
   285         return conn
       
   286 
       
   287     def _normalize_isolation_level(self, level):
       
   288         if level is None:
       
   289             return level
       
   290         if type(level) == str:
       
   291             level = level.upper().replace('_', ' ')
       
   292             if level in (
       
   293                 'AUTOCOMMIT',
       
   294                 'READ UNCOMMITTED',
       
   295                 'READ COMMITTED',
       
   296                 'REPEATABLE READ',
       
   297                 'SERIALIZABLE'):
       
   298                 return level
       
   299         raise MyManagerError('Unknown isolation level name: "%s"', level)
       
   300 
       
   301     def _check_fork(self):
       
   302         '''Check if process was forked (PID has changed).
       
   303 
       
   304         If it was, clean parent's connections.
       
   305         New connections are created for children.
       
   306         Known connection credentials are inherited, but not shared.
       
   307 
       
   308         '''
       
   309         if self.pid == multiprocessing.current_process().pid:
       
   310             # PID has not changed
       
   311             return
       
   312 
       
   313         # update saved PID
       
   314         self.pid = multiprocessing.current_process().pid
       
   315         # reinitialize lock
       
   316         self.lock = threading.Lock()
       
   317         # clean parent's connections
       
   318         for name in self.conn_pool:
       
   319             self.conn_pool[name] = []
       
   320 
       
   321     @classmethod
       
   322     def get_instance(cls):
       
   323         if not hasattr(cls, '_instance'):
       
   324             cls._instance = cls()
       
   325         return cls._instance
       
   326 
       
   327 
       
   328 def get_instance():
       
   329     return MyManager.get_instance()
       
   330