mytoolkit/mymanager.py
changeset 71 4251068a251a
parent 49 08e4dfe1b0cb
child 74 d4306261ddfb
equal deleted inserted replaced
70:77e65040711c 71:4251068a251a
     1 # -*- coding: utf-8 -*-
     1 # -*- coding: utf-8 -*-
     2 #
     2 #
     3 # MyManager - manage database connections (MySQL version)
     3 # MyManager - manage database connections (MySQL version)
     4 #
     4 #
     5 # Requires: Python 2.6 / 2.7, MySQLdb
     5 # Requires: Python 2.6 / 2.7 / 3.2, MySQLdb
     6 #
     6 #
     7 # Part of pgtoolkit
     7 # Part of pgtoolkit
     8 # http://hg.devl.cz/pgtoolkit
     8 # http://hg.devl.cz/pgtoolkit
     9 #
     9 #
    10 # Copyright (c) 2011  Radek Brich <radek.brich@devl.cz>
    10 # Copyright (c) 2011, 2013  Radek Brich <radek.brich@devl.cz>
    11 #
    11 #
    12 # Permission is hereby granted, free of charge, to any person obtaining a copy
    12 # Permission is hereby granted, free of charge, to any person obtaining a copy
    13 # of this software and associated documentation files (the "Software"), to deal
    13 # of this software and associated documentation files (the "Software"), to deal
    14 # in the Software without restriction, including without limitation the rights
    14 # in the Software without restriction, including without limitation the rights
    15 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    15 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    27 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
    27 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
    28 # THE SOFTWARE.
    28 # THE SOFTWARE.
    29 
    29 
    30 """MySQL database connection manager
    30 """MySQL database connection manager
    31 
    31 
    32 MyManager wraps MySQLdb connect function, adding following features:
    32 MyManager wraps MySQLdb in same manner as PgManager wraps psycopg2.
    33 
    33 It's fully compatible so it should work as drop-in replacement for PgManager.
    34  * Manage database connection parameters - link connection parameters
    34 
    35    to an unique identifier, retrieve connection object by this identifier
    35 It adds following features over MySQLdb:
    36 
    36 
    37  * Connection pooling - connections with same identifier are pooled and reused
    37  * Save and reuse database connection parameters
    38 
    38 
    39  * Easy query using the with statement - retrieve cursor directly by connection
    39  * Connection pooling
    40    identifier, don't worry about connections
    40 
    41 
    41  * Easy query using the with statement
    42  * Dict rows - cursor has additional methods like fetchall_dict(), which
    42 
    43    returns dict row instead of ordinary list-like row
    43  * Dictionary rows
    44 
    44 
    45 Example:
    45 Example:
    46 
    46 
    47 import mymanager
    47     from mytoolkit import mymanager
    48 
    48 
    49 db = mymanager.get_instance()
    49     dbm = mymanager.get_instance()
    50 db.create_conn(host='127.0.0.1', db='default')
    50     dbm.create_conn(host='127.0.0.1', dbname='default')
    51 
    51 
    52 with db.cursor() as curs:
    52     with dbm.cursor() as curs:
    53     curs.execute('SELECT now() AS now')
    53         curs.execute('SELECT now() AS now')
    54     row = curs.fetchone_dict()
    54         row = curs.fetchone_dict()
    55     print row.now
    55         print(row.now)
    56 
    56 
    57 First, we have obtained MyManager instance. This is like calling
    57 See PgManager docs for more information.
    58 MyManager(), although in our example the instance is global. That means
       
    59 getting the instance in another module brings us all the defined connections
       
    60 etc.
       
    61 
       
    62 On next line we have created connection named 'default' (this name can be left out).
       
    63 The with statement obtains connection (actually connects to database when needed),
       
    64 then returns cursor for this connection. At the end of with statement,
       
    65 the connection is returned to the pool or closed (depending on number of connections
       
    66 in pool and on setting of keep_open parameter).
       
    67 
       
    68 The row returned by fetchone_dict() is special dict object, which can be accessed
       
    69 using item or attribute access, that is row['now'] or row.now.
       
    70 
    58 
    71 """
    59 """
    72 
    60 
    73 from contextlib import contextmanager
    61 from contextlib import contextmanager
       
    62 from collections import OrderedDict
    74 import logging
    63 import logging
    75 import threading
    64 import threading
    76 
    65 
    77 import MySQLdb
    66 import MySQLdb
    78 import MySQLdb.cursors
    67 import MySQLdb.cursors
    79 
    68 
    80 from MySQLdb import DatabaseError, IntegrityError, OperationalError
    69 from MySQLdb import DatabaseError, IntegrityError, OperationalError
    81 
    70 
       
    71 from pgtoolkit.pgmanager import RowDict
       
    72 
       
    73 
       
    74 log_sql = logging.getLogger("mymanager_sql")
       
    75 
    82 
    76 
    83 class MyManagerError(Exception):
    77 class MyManagerError(Exception):
    84 
    78 
    85     pass
    79     pass
    86 
    80 
    87 
    81 
    88 class ConnectionInfo:
    82 class ConnectionInfo:
    89 
    83 
    90     def __init__(self, isolation_level=None, init_statement=None, keep_open=1, **kw):
    84     def __init__(self, name, isolation_level=None,
       
    85                  init_statement=None, pool_size=1, **kw):
       
    86         self.name = name  # connection name is logged with SQL queries
    91         self.isolation_level = isolation_level
    87         self.isolation_level = isolation_level
    92         self.init_statement = init_statement
    88         self.init_statement = init_statement
    93         self.keep_open = keep_open
    89         self.pool_size = pool_size
    94         self.parameters = kw
    90         self.parameters = kw
    95         self.adjust_parameters()
    91         self.adjust_parameters()
    96 
    92 
    97     def adjust_parameters(self):
    93     def adjust_parameters(self):
    98         '''Rename Postgres parameters to proper value for MySQL.'''
    94         '''Rename Postgres parameters to proper value for MySQL.'''
   103                 k = m[k]
    99                 k = m[k]
   104             res[k] = v
   100             res[k] = v
   105         self.parameters = res
   101         self.parameters = res
   106 
   102 
   107 
   103 
   108 class RowDict(dict):
       
   109 
       
   110     def __getattr__(self, key):
       
   111         return self[key]
       
   112 
       
   113 
       
   114 class Cursor(MySQLdb.cursors.Cursor):
   104 class Cursor(MySQLdb.cursors.Cursor):
   115 
   105 
   116     def execute(self, query, args=None):
   106     def execute(self, query, args=None):
   117         try:
   107         try:
   118             return super(Cursor, self).execute(query, args)
   108             return super(Cursor, self).execute(query, args)
   119         finally:
   109         finally:
   120             log.debug(self._executed.decode('utf8'))
   110             log_sql.debug(self._executed.decode('utf8'))
   121 
   111 
   122     def callproc(self, procname, args=None):
   112     def callproc(self, procname, args=None):
   123         try:
   113         try:
   124             return super(Cursor, self).callproc(procname, args)
   114             return super(Cursor, self).callproc(procname, args)
   125         finally:
   115         finally:
   126             log.debug(self._executed.decode('utf8'))
   116             log_sql.debug(self._executed.decode('utf8'))
   127 
   117 
   128     def row_dict(self, row, lstrip=None):
   118     def row_dict(self, row, lstrip=None):
   129         adjustname = lambda a: a
   119         adjustname = lambda a: a
   130         if lstrip:
   120         if lstrip:
   131             adjustname = lambda a: a.lstrip(lstrip)
   121             adjustname = lambda a: a.lstrip(lstrip)
   146 
   136 
   147     def __init__(self):
   137     def __init__(self):
   148         self.conn_known = {}  # available connections
   138         self.conn_known = {}  # available connections
   149         self.conn_pool = {}
   139         self.conn_pool = {}
   150         self.lock = threading.Lock()
   140         self.lock = threading.Lock()
       
   141         self.pid = multiprocessing.current_process().pid  # forking check
   151 
   142 
   152     def __del__(self):
   143     def __del__(self):
   153         for conn in tuple(self.conn_known.keys()):
   144         for conn in tuple(self.conn_known.keys()):
   154             self.destroy_conn(conn)
   145             self.destroy_conn(conn)
   155 
   146 
   157         '''Create named connection.'''
   148         '''Create named connection.'''
   158         if name in self.conn_known:
   149         if name in self.conn_known:
   159             raise MyManagerError('Connection name "%s" already registered.' % name)
   150             raise MyManagerError('Connection name "%s" already registered.' % name)
   160 
   151 
   161         isolation_level = self._normalize_isolation_level(isolation_level)
   152         isolation_level = self._normalize_isolation_level(isolation_level)
   162         ci = ConnectionInfo(isolation_level, **kw)
   153         ci = ConnectionInfo(name, isolation_level, **kw)
   163 
   154 
   164         self.conn_known[name] = ci
   155         self.conn_known[name] = ci
   165         self.conn_pool[name] = []
   156         self.conn_pool[name] = []
   166 
   157 
   167     def close_conn(self, name='default'):
   158     def close_conn(self, name='default'):
   188         del self.conn_known[name]
   179         del self.conn_known[name]
   189         del self.conn_pool[name]
   180         del self.conn_pool[name]
   190 
   181 
   191     def get_conn(self, name='default'):
   182     def get_conn(self, name='default'):
   192         '''Get connection of name 'name' from pool.'''
   183         '''Get connection of name 'name' from pool.'''
       
   184         self._check_fork()
   193         self.lock.acquire()
   185         self.lock.acquire()
   194         try:
   186         try:
   195             if not name in self.conn_known:
   187             if not name in self.conn_known:
   196                 raise MyManagerError("Connection name '%s' not registered." % name)
   188                 raise MyManagerError("Connection name '%s' not registered." % name)
   197 
   189 
       
   190             # connection from pool
   198             conn = None
   191             conn = None
   199             while len(self.conn_pool[name]) and conn is None:
   192             while len(self.conn_pool[name]) and conn is None:
   200                 conn = self.conn_pool[name].pop()
   193                 conn = self.conn_pool[name].pop()
   201                 try:
   194                 try:
   202                     conn.ping()
   195                     conn.ping()
   221         self.lock.acquire()
   214         self.lock.acquire()
   222         try:
   215         try:
   223             if not name in self.conn_known:
   216             if not name in self.conn_known:
   224                 raise MyManagerError("Connection name '%s' not registered." % name)
   217                 raise MyManagerError("Connection name '%s' not registered." % name)
   225 
   218 
   226             if len(self.conn_pool[name]) >= self.conn_known[name].keep_open:
   219             if len(self.conn_pool[name]) >= self.conn_known[name].pool_size:
   227                 conn.close()
   220                 conn.close()
   228                 return
   221                 return
   229 
   222 
   230             # connection returned to the pool must not be in transaction
   223             # connection returned to the pool must not be in transaction
   231             try:
   224             try:
   265                 curs.execute('SET SESSION TRANSACTION ISOLATION LEVEL ' + ci.isolation_level)
   258                 curs.execute('SET SESSION TRANSACTION ISOLATION LEVEL ' + ci.isolation_level)
   266                 curs.close()
   259                 curs.close()
   267         if ci.init_statement:
   260         if ci.init_statement:
   268             curs = conn.cursor()
   261             curs = conn.cursor()
   269             curs.execute(ci.init_statement)
   262             curs.execute(ci.init_statement)
       
   263             curs.connection.commit()
   270             curs.close()
   264             curs.close()
   271         return conn
   265         return conn
   272 
   266 
   273     def _normalize_isolation_level(self, level):
   267     def _normalize_isolation_level(self, level):
   274         if level is None:
   268         if level is None:
   282                 'REPEATABLE READ',
   276                 'REPEATABLE READ',
   283                 'SERIALIZABLE'):
   277                 'SERIALIZABLE'):
   284                 return level
   278                 return level
   285         raise MyManagerError('Unknown isolation level name: "%s"', level)
   279         raise MyManagerError('Unknown isolation level name: "%s"', level)
   286 
   280 
   287 
   281     def _check_fork(self):
   288 try:
   282         '''Check if process was forked (PID has changed).
   289     NullHandler = logging.NullHandler
   283 
   290 except AttributeError:
   284         If it was, clean parent's connections.
   291     class NullHandler(logging.Handler):
   285         New connections are created for children.
   292         def emit(self, record):
   286         Known connection credentials are inherited, but not shared.
   293             pass
   287 
   294 
   288         '''
   295 
   289         if self.pid == multiprocessing.current_process().pid:
   296 log = logging.getLogger("mymanager")
   290             # PID has not changed
   297 log.addHandler(NullHandler())
   291             return
   298 
   292 
   299 
   293         # update saved PID
   300 instance = None
   294         self.pid = multiprocessing.current_process().pid
       
   295         # reinitialize lock
       
   296         self.lock = threading.Lock()
       
   297         # clean parent's connections
       
   298         for name in self.conn_pool:
       
   299             self.conn_pool[name] = []
       
   300 
       
   301     @classmethod
       
   302     def get_instance(cls):
       
   303         if not hasattr(cls, '_instance'):
       
   304             cls._instance = cls()
       
   305         return cls._instance
   301 
   306 
   302 
   307 
   303 def get_instance():
   308 def get_instance():
   304     global instance
   309     return MyManager.get_instance()
   305     if instance is None:
   310 
   306         instance = MyManager()
       
   307     return instance
       
   308 
       
   309