pydbkit/mymanager_oursql.py
changeset 104 d8ff52a0390f
parent 77 2cfef775f518
child 106 db4c582a2abd
equal deleted inserted replaced
103:24e94a3da209 104:d8ff52a0390f
       
     1 # -*- coding: utf-8 -*-
       
     2 #
       
     3 # MyManager - manage database connections (MySQL version)
       
     4 #
       
     5 # Requires: Python 2.6 / 2.7 / 3.2, oursql
       
     6 #
       
     7 # Part of pydbkit
       
     8 # http://hg.devl.cz/pydbkit
       
     9 #
       
    10 # Copyright (c) 2011, 2013  Radek Brich <radek.brich@devl.cz>
       
    11 #
       
    12 # Permission is hereby granted, free of charge, to any person obtaining a copy
       
    13 # of this software and associated documentation files (the "Software"), to deal
       
    14 # in the Software without restriction, including without limitation the rights
       
    15 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
       
    16 # copies of the Software, and to permit persons to whom the Software is
       
    17 # furnished to do so, subject to the following conditions:
       
    18 #
       
    19 # The above copyright notice and this permission notice shall be included in
       
    20 # all copies or substantial portions of the Software.
       
    21 #
       
    22 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
       
    23 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
       
    24 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
       
    25 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
       
    26 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
       
    27 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
       
    28 # THE SOFTWARE.
       
    29 
       
    30 """MySQL database connection manager
       
    31 
       
    32 MyManager wraps oursql in same manner as PgManager wraps psycopg2.
       
    33 It's fully compatible so it should work as drop-in replacement for PgManager.
       
    34 
       
    35 It adds following features over oursql:
       
    36 
       
    37  * Save and reuse database connection parameters
       
    38 
       
    39  * Connection pooling
       
    40 
       
    41  * Easy query using the with statement
       
    42 
       
    43  * Dictionary rows
       
    44 
       
    45 Example:
       
    46 
       
    47     from pydbkit import mymanager_oursql
       
    48 
       
    49     dbm = mymanager.get_instance()
       
    50     dbm.create_conn(host='127.0.0.1', dbname='default')
       
    51 
       
    52     with dbm.cursor() as curs:
       
    53         curs.execute('SELECT now() AS now')
       
    54         row = curs.fetchone_dict()
       
    55         print(row.now)
       
    56 
       
    57 See PgManager docs for more information.
       
    58 
       
    59 """
       
    60 
       
    61 from contextlib import contextmanager
       
    62 from collections import OrderedDict
       
    63 import logging
       
    64 import threading
       
    65 import multiprocessing
       
    66 
       
    67 import oursql
       
    68 
       
    69 from oursql import DatabaseError, IntegrityError, OperationalError
       
    70 
       
    71 
       
    72 log_sql = logging.getLogger("mymanager_sql")
       
    73 log_sql.addHandler(logging.NullHandler())
       
    74 
       
    75 
       
    76 class MyManagerError(Exception):
       
    77 
       
    78     pass
       
    79 
       
    80 
       
    81 class RowDict(OrderedDict):
       
    82     """Special dictionary used for rows returned from queries.
       
    83 
       
    84     Items keep order in which columns where returned from database.
       
    85 
       
    86     It supports three styles of access:
       
    87 
       
    88         Dict style:
       
    89             row['id']
       
    90             for key in row:
       
    91                 ...
       
    92 
       
    93         Object style (only works if column name does not collide with any method name):
       
    94             row.id
       
    95 
       
    96         Tuple style:
       
    97             row[0]
       
    98             id, name = row.values()
       
    99 
       
   100     """
       
   101 
       
   102     def __getitem__(self, key):
       
   103         if isinstance(key, int):
       
   104             return tuple(self.values())[key]
       
   105         else:
       
   106             return OrderedDict.__getitem__(self, key)
       
   107 
       
   108     def __getattr__(self, key):
       
   109         try:
       
   110             return self[key]
       
   111         except KeyError:
       
   112             raise AttributeError(key)
       
   113 
       
   114 
       
   115 class ConnectionInfo:
       
   116 
       
   117     def __init__(self, name, isolation_level=None,
       
   118                  init_statement=None, pool_size=1, **kw):
       
   119         self.name = name  # connection name is logged with SQL queries
       
   120         self.isolation_level = isolation_level
       
   121         self.init_statement = init_statement
       
   122         self.pool_size = pool_size
       
   123         self.parameters = kw
       
   124         self.adjust_parameters()
       
   125 
       
   126     def adjust_parameters(self):
       
   127         '''Rename Postgres parameters to proper value for MySQL.'''
       
   128         m = {'dbname' : 'db', 'password' : 'passwd'}
       
   129         res = dict()
       
   130         for k, v in list(self.parameters.items()):
       
   131             if k in m:
       
   132                 k = m[k]
       
   133             res[k] = v
       
   134         self.parameters = res
       
   135 
       
   136 
       
   137 class Cursor(oursql.Cursor):
       
   138 
       
   139     def execute(self, query, args=[]):
       
   140         try:
       
   141             return super(Cursor, self).execute(query, args)
       
   142         finally:
       
   143             self._log_query(query, args)
       
   144 
       
   145     def callproc(self, procname, args=[]):
       
   146         try:
       
   147             return super(Cursor, self).callproc(procname, args)
       
   148         finally:
       
   149             self._log_query(query, args)
       
   150 
       
   151     def row_dict(self, row, lstrip=None):
       
   152         adjustname = lambda a: a
       
   153         if lstrip:
       
   154             adjustname = lambda a: a.lstrip(lstrip)
       
   155         return RowDict(zip([adjustname(desc[0]) for desc in self.description], row))
       
   156 
       
   157     def fetchone_dict(self, lstrip=None):
       
   158         row = super(Cursor, self).fetchone()
       
   159         if row is None:
       
   160             return None
       
   161         return self.row_dict(row, lstrip)
       
   162 
       
   163     def fetchall_dict(self, lstrip=None):
       
   164         rows = super(Cursor, self).fetchall()
       
   165         return [self.row_dict(row, lstrip) for row in rows]
       
   166 
       
   167     def _log_query(self, query, args):
       
   168         name = self.connection.name if hasattr(self.connection, 'name') else '-'
       
   169         log_sql.debug('[%s] %s %s' % (name, query, args))
       
   170 
       
   171 
       
   172 class MyManager:
       
   173 
       
   174     def __init__(self):
       
   175         self.conn_known = {}  # available connections
       
   176         self.conn_pool = {}
       
   177         self.lock = threading.Lock()
       
   178         self.pid = multiprocessing.current_process().pid  # forking check
       
   179 
       
   180     def __del__(self):
       
   181         for conn in tuple(self.conn_known.keys()):
       
   182             self.destroy_conn(conn)
       
   183 
       
   184     def create_conn(self, name='default', isolation_level=None, **kw):
       
   185         '''Create named connection.'''
       
   186         if name in self.conn_known:
       
   187             raise MyManagerError('Connection name "%s" already registered.' % name)
       
   188 
       
   189         isolation_level = self._normalize_isolation_level(isolation_level)
       
   190         ci = ConnectionInfo(name, isolation_level, **kw)
       
   191 
       
   192         self.conn_known[name] = ci
       
   193         self.conn_pool[name] = []
       
   194 
       
   195     def close_conn(self, name='default'):
       
   196         '''Close all connections of given name.
       
   197 
       
   198         Connection credentials are still saved.
       
   199 
       
   200         '''
       
   201         while len(self.conn_pool[name]):
       
   202             conn = self.conn_pool[name].pop()
       
   203             conn.close()
       
   204 
       
   205     def destroy_conn(self, name='default'):
       
   206         '''Destroy connection.
       
   207 
       
   208         Counterpart of create_conn.
       
   209 
       
   210         '''
       
   211         if not name in self.conn_known:
       
   212             raise MyManagerError('Connection name "%s" not registered.' % name)
       
   213 
       
   214         self.close_conn(name)
       
   215 
       
   216         del self.conn_known[name]
       
   217         del self.conn_pool[name]
       
   218 
       
   219     def get_conn(self, name='default'):
       
   220         '''Get connection of name 'name' from pool.'''
       
   221         self._check_fork()
       
   222         self.lock.acquire()
       
   223         try:
       
   224             if not name in self.conn_known:
       
   225                 raise MyManagerError("Connection name '%s' not registered." % name)
       
   226 
       
   227             # connection from pool
       
   228             conn = None
       
   229             while len(self.conn_pool[name]) and conn is None:
       
   230                 conn = self.conn_pool[name].pop()
       
   231                 try:
       
   232                     conn.ping()
       
   233                 except oursql.MySQLError:
       
   234                     conn.close()
       
   235                     conn = None
       
   236 
       
   237             if conn is None:
       
   238                 ci = self.conn_known[name]
       
   239                 conn = self._connect(ci)
       
   240         finally:
       
   241             self.lock.release()
       
   242         return conn
       
   243 
       
   244     def put_conn(self, conn, name='default'):
       
   245         '''Put connection back to pool.
       
   246 
       
   247         Name must be same as used for get_conn,
       
   248         otherwise things become broken.
       
   249 
       
   250         '''
       
   251         self.lock.acquire()
       
   252         try:
       
   253             if not name in self.conn_known:
       
   254                 raise MyManagerError("Connection name '%s' not registered." % name)
       
   255 
       
   256             if len(self.conn_pool[name]) >= self.conn_known[name].pool_size:
       
   257                 conn.close()
       
   258                 return
       
   259 
       
   260             # connection returned to the pool must not be in transaction
       
   261             try:
       
   262                 conn.rollback()
       
   263             except OperationalError:
       
   264                 conn.close()
       
   265                 return
       
   266 
       
   267             self.conn_pool[name].append(conn)
       
   268         finally:
       
   269             self.lock.release()
       
   270 
       
   271     @contextmanager
       
   272     def cursor(self, name='default'):
       
   273         '''Cursor context.
       
   274 
       
   275         Uses any connection of name 'name' from pool
       
   276         and returns cursor for that connection.
       
   277 
       
   278         '''
       
   279         conn = self.get_conn(name)
       
   280 
       
   281         try:
       
   282             curs = conn.cursor()
       
   283             yield curs
       
   284         finally:
       
   285             curs.close()
       
   286             self.put_conn(conn, name)
       
   287 
       
   288     def _connect(self, ci):
       
   289         conn = oursql.connect(default_cursor=Cursor, **ci.parameters)
       
   290         if not ci.isolation_level is None:
       
   291             if ci.isolation_level == 'AUTOCOMMIT':
       
   292                 conn.autocommit(True)
       
   293             else:
       
   294                 curs = conn.cursor()
       
   295                 curs.execute('SET SESSION TRANSACTION ISOLATION LEVEL ' + ci.isolation_level)
       
   296                 curs.close()
       
   297         if ci.init_statement:
       
   298             curs = conn.cursor()
       
   299             curs.execute(ci.init_statement)
       
   300             curs.connection.commit()
       
   301             curs.close()
       
   302         return conn
       
   303 
       
   304     def _normalize_isolation_level(self, level):
       
   305         if level is None:
       
   306             return level
       
   307         if type(level) == str:
       
   308             level = level.upper().replace('_', ' ')
       
   309             if level in (
       
   310                 'AUTOCOMMIT',
       
   311                 'READ UNCOMMITTED',
       
   312                 'READ COMMITTED',
       
   313                 'REPEATABLE READ',
       
   314                 'SERIALIZABLE'):
       
   315                 return level
       
   316         raise MyManagerError('Unknown isolation level name: "%s"', level)
       
   317 
       
   318     def _check_fork(self):
       
   319         '''Check if process was forked (PID has changed).
       
   320 
       
   321         If it was, clean parent's connections.
       
   322         New connections are created for children.
       
   323         Known connection credentials are inherited, but not shared.
       
   324 
       
   325         '''
       
   326         if self.pid == multiprocessing.current_process().pid:
       
   327             # PID has not changed
       
   328             return
       
   329 
       
   330         # update saved PID
       
   331         self.pid = multiprocessing.current_process().pid
       
   332         # reinitialize lock
       
   333         self.lock = threading.Lock()
       
   334         # clean parent's connections
       
   335         for name in self.conn_pool:
       
   336             self.conn_pool[name] = []
       
   337 
       
   338     @classmethod
       
   339     def get_instance(cls):
       
   340         if not hasattr(cls, '_instance'):
       
   341             cls._instance = cls()
       
   342         return cls._instance
       
   343 
       
   344 
       
   345 def get_instance():
       
   346     return MyManager.get_instance()
       
   347