tools/pgmanager.py
changeset 7 685b20d2d3ab
parent 4 80634cb1c65b
child 8 2911935c524d
equal deleted inserted replaced
6:4ab077c93b2d 7:685b20d2d3ab
       
     1 # -*- coding: utf-8 -*-
       
     2 #
       
     3 # PgManager - manage database connections
       
     4 #
       
     5 # Requires: Python 2.6, psycopg2
       
     6 #
       
     7 # Copyright (c) 2010, 2011  Radek Brich <radek.brich@devl.cz>
       
     8 #
       
     9 # Permission is hereby granted, free of charge, to any person obtaining a copy
       
    10 # of this software and associated documentation files (the "Software"), to deal
       
    11 # in the Software without restriction, including without limitation the rights
       
    12 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
       
    13 # copies of the Software, and to permit persons to whom the Software is
       
    14 # furnished to do so, subject to the following conditions:
       
    15 #
       
    16 # The above copyright notice and this permission notice shall be included in
       
    17 # all copies or substantial portions of the Software.
       
    18 #
       
    19 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
       
    20 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
       
    21 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
       
    22 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
       
    23 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
       
    24 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
       
    25 # THE SOFTWARE.
       
    26 
       
    27 """Postgres database connection manager
       
    28 
       
    29 PgManager wraps psycopg2 connect function, adding following features:
       
    30 
       
    31  * Manage database connection parameters - link connection parameters
       
    32    to an unique identifier, retrieve connection object by this identifier
       
    33 
       
    34  * Connection pooling - connections with same identifier are pooled and reused
       
    35 
       
    36  * Easy query using the with statement - retrieve cursor directly by connection
       
    37    identifier, don't worry about connections
       
    38 
       
    39  * Dict rows - cursor has additional methods like fetchall_dict(), which
       
    40    returns dict row instead of ordinary list-like row
       
    41 
       
    42 Example:
       
    43 
       
    44 import pgmanager
       
    45 
       
    46 pgm = pgmanager.get_instance()
       
    47 pgm.create_conn(hostaddr='127.0.0.1', dbname='postgres')
       
    48 
       
    49 with pgm.cursor() as curs:
       
    50     curs.execute('SELECT now() AS now')
       
    51     row = curs.fetchone_dict()
       
    52     print row.now
       
    53 
       
    54 First, we have obtained PgManager instance. This is like calling
       
    55 PgManager(), although in our example the instance is global. That means
       
    56 getting the instance in another module brings us all the defined connections
       
    57 etc.
       
    58 
       
    59 On second line we created connection named 'default' (this name can be left out).
       
    60 The with statement obtains connection (actually connects to database when needed),
       
    61 then returns cursor for this connection. On exit, the connection is returned
       
    62 to the pool or closed (depending on number of connections on pool and setting
       
    63 of keep_open parameter).
       
    64 
       
    65 The row returned by fetchone_dict() is special dict object, which can be accessed
       
    66 using item or attribute access, that is row['now'] or row.now.
       
    67 """
       
    68 
       
    69 from contextlib import contextmanager
       
    70 import logging
       
    71 import threading
       
    72 import select
       
    73 
       
    74 import psycopg2
       
    75 import psycopg2.extensions
       
    76 
       
    77 from psycopg2 import DatabaseError, IntegrityError
       
    78 
       
    79 
       
    80 class PgManagerError(Exception):
       
    81 
       
    82     pass
       
    83 
       
    84 
       
    85 class ConnectionInfo:
       
    86 
       
    87     def __init__(self, dsn, isolation_level=None, init_statement=None, keep_open=1):
       
    88         self.dsn = dsn
       
    89         self.isolation_level = isolation_level
       
    90         self.init_statement = init_statement
       
    91         self.keep_open = keep_open
       
    92 
       
    93 
       
    94 class RowDict(dict):
       
    95 
       
    96     def __getattr__(self, key):
       
    97         return self[key]
       
    98 
       
    99 
       
   100 class Cursor(psycopg2.extensions.cursor):
       
   101 
       
   102     def execute(self, query, args=None):
       
   103         try:
       
   104             return super(Cursor, self).execute(query, args)
       
   105         finally:
       
   106             log.debug(self.query.decode('utf8'))
       
   107 
       
   108     def callproc(self, procname, args=None):
       
   109         try:
       
   110             return super(Cursor, self).callproc(procname, args)
       
   111         finally:
       
   112             log.debug(self.query.decode('utf8'))
       
   113 
       
   114     def row_dict(self, row, lstrip=None):
       
   115         adjustname = lambda a: a
       
   116         if lstrip:
       
   117             adjustname = lambda a: a.lstrip(lstrip)
       
   118         return RowDict(zip([adjustname(desc[0]) for desc in self.description], row))
       
   119 
       
   120     def fetchone_dict(self, lstrip=None):
       
   121         row = super(Cursor, self).fetchone()
       
   122         if row is None:
       
   123             return None
       
   124         return self.row_dict(row, lstrip)
       
   125 
       
   126     def fetchall_dict(self, lstrip=None):
       
   127         rows = super(Cursor, self).fetchall()
       
   128         return [self.row_dict(row, lstrip) for row in rows]
       
   129 
       
   130     def fetchone_adapted(self):
       
   131         '''Like fetchone() but values are quoted for direct inclusion in SQL query.
       
   132         
       
   133         This is useful when you need to generate SQL script from data returned
       
   134         by the query. Use mogrify() for simple cases.
       
   135         
       
   136         '''
       
   137         row = super(Cursor, self).fetchone()
       
   138         if row is None:
       
   139             return None
       
   140         return [self.mogrify('%s', [x]).decode('utf8') for x in row]
       
   141 
       
   142     def fetchall_adapted(self):
       
   143         '''Like fetchall() but values are quoted for direct inclusion in SQL query.'''
       
   144         rows = super(Cursor, self).fetchall()
       
   145         return [[self.mogrify('%s', [x]).decode('utf8') for x in row] for row in rows]
       
   146 
       
   147 
       
   148 class Connection(psycopg2.extensions.connection):
       
   149 
       
   150     def cursor(self, name=None):
       
   151         if name is None:
       
   152             return super(Connection, self).cursor(cursor_factory=Cursor)
       
   153         else:
       
   154             return super(Connection, self).cursor(name, cursor_factory=Cursor)
       
   155 
       
   156 
       
   157 class PgManager:
       
   158 
       
   159     def __init__(self):
       
   160         self.conn_known = {}  # available connections
       
   161         self.conn_pool = {}
       
   162         self.lock = threading.Lock()
       
   163 
       
   164     def __del__(self):
       
   165         for conn in tuple(self.conn_known.keys()):
       
   166             self.destroy_conn(conn)
       
   167 
       
   168     def create_conn(self, name='default', isolation_level=None, dsn=None, **kw):
       
   169         '''Create named connection.'''
       
   170         if name in self.conn_known:
       
   171             raise PgManagerError('Connection name "%s" already registered.' % name)
       
   172 
       
   173         if dsn is None:
       
   174             dsn = ' '.join([x[0]+'='+str(x[1]) for x in kw.items()])
       
   175 
       
   176         isolation_level = self._normalize_isolation_level(isolation_level)
       
   177         ci = ConnectionInfo(dsn, isolation_level)
       
   178 
       
   179         self.conn_known[name] = ci
       
   180         self.conn_pool[name] = []
       
   181 
       
   182     def close_conn(self, name='default'):
       
   183         '''Close all connections of given name.
       
   184         
       
   185         Connection credentials are still saved.
       
   186         
       
   187         '''
       
   188         while len(self.conn_pool[name]):
       
   189             conn = self.conn_pool[name].pop()
       
   190             conn.close()
       
   191 
       
   192     def destroy_conn(self, name='default'):
       
   193         '''Destroy connection.
       
   194         
       
   195         Counterpart of create_conn.
       
   196         
       
   197         '''
       
   198         if not name in self.conn_known:
       
   199             raise PgManagerError('Connection name "%s" not registered.' % name)
       
   200 
       
   201         self.close_conn(name)
       
   202 
       
   203         del self.conn_known[name]
       
   204         del self.conn_pool[name]
       
   205 
       
   206     def get_conn(self, name='default'):
       
   207         '''Get connection of name 'name' from pool.'''
       
   208         self.lock.acquire()
       
   209         try:
       
   210             if not name in self.conn_known:
       
   211                 raise PgManagerError("Connection name '%s' not registered." % name)
       
   212 
       
   213             conn = None
       
   214             while len(self.conn_pool[name]) and conn is None:
       
   215                 conn = self.conn_pool[name].pop()
       
   216                 if conn.closed:
       
   217                     conn = None
       
   218 
       
   219             if conn is None:
       
   220                 ci = self.conn_known[name]
       
   221                 conn = psycopg2.connect(ci.dsn, connection_factory=Connection)
       
   222                 if not ci.isolation_level is None:
       
   223                     conn.set_isolation_level(ci.isolation_level)
       
   224                 if ci.init_statement:
       
   225                     curs = conn.cursor()
       
   226                     curs.execute(ci.init_statement)
       
   227                     curs.close()
       
   228         finally:
       
   229             self.lock.release()
       
   230         return conn
       
   231 
       
   232     def put_conn(self, conn, name='default'):
       
   233         '''Put connection back to pool.
       
   234         
       
   235         Name must be same as used for get_conn,
       
   236         otherwise things become broken.
       
   237         
       
   238         '''
       
   239         self.lock.acquire()
       
   240         try:
       
   241             if not name in self.conn_known:
       
   242                 raise PgManagerError("Connection name '%s' not registered." % name)
       
   243 
       
   244             if len(self.conn_pool[name]) >= self.conn_known[name].keep_open:
       
   245                 conn.close()
       
   246                 return
       
   247 
       
   248             if conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN:
       
   249                 conn.close()
       
   250                 return
       
   251 
       
   252             # connection returned to the pool must not be in transaction
       
   253             if conn.get_transaction_status() != psycopg2.extensions.TRANSACTION_STATUS_IDLE:
       
   254                 conn.rollback()
       
   255 
       
   256             self.conn_pool[name].append(conn)
       
   257         finally:
       
   258             self.lock.release()
       
   259 
       
   260     @contextmanager
       
   261     def cursor(self, name='default'):
       
   262         '''Cursor context.
       
   263         
       
   264         Uses any connection of name 'name' from pool
       
   265         and returns cursor for that connection.
       
   266         
       
   267         '''
       
   268         conn = self.get_conn(name)
       
   269 
       
   270         try:
       
   271             curs = conn.cursor()
       
   272             yield curs
       
   273         finally:
       
   274             curs.close()
       
   275             self.put_conn(conn, name)
       
   276 
       
   277     def wait_for_notify(self, name='default', timeout=5):
       
   278         '''Wait for asynchronous notifies, return the last one.
       
   279         
       
   280         Returns None on timeout.
       
   281         
       
   282         '''
       
   283         conn = self.get_conn(name)
       
   284         
       
   285         try:
       
   286             # any residual notify?
       
   287             # then return it, that should not break anything
       
   288             if conn.notifies:
       
   289                 return conn.notifies.pop()
       
   290 
       
   291             if select.select([conn], [], [], timeout) == ([], [], []):
       
   292                 # timeout
       
   293                 return None
       
   294             else:
       
   295                 conn.poll()
       
   296 
       
   297                 # return just the last notify (we do not care for older ones)
       
   298                 if conn.notifies:
       
   299                     return conn.notifies.pop()
       
   300                 return None
       
   301         finally:
       
   302             # clean notifies
       
   303             while conn.notifies:
       
   304                 conn.notifies.pop()
       
   305             self.put_conn(conn, name)
       
   306 
       
   307     def _normalize_isolation_level(self, level):
       
   308         if type(level) == str:
       
   309             if level.lower() == 'autocommit':
       
   310                 return psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
       
   311             if level.lower() == 'read_committed':
       
   312                 return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED
       
   313             if level.lower() == 'serializable':
       
   314                 return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
       
   315             raise PgManagerError('Unknown isolation level name: "%s"', level)
       
   316         return level
       
   317 
       
   318 
       
   319 try:
       
   320     NullHandler = logging.NullHandler
       
   321 except AttributeError:
       
   322     class NullHandler(logging.Handler):
       
   323         def emit(self, record):
       
   324             pass
       
   325 
       
   326 
       
   327 log = logging.getLogger("pgmanager")
       
   328 log.addHandler(NullHandler())
       
   329 
       
   330 
       
   331 instance = None
       
   332 
       
   333 
       
   334 def get_instance():
       
   335     global instance
       
   336     if instance is None:
       
   337         instance = PgManager()
       
   338     return instance
       
   339 
       
   340