pgtools/pgmanager.py
changeset 7 685b20d2d3ab
parent 6 4ab077c93b2d
child 8 2911935c524d
equal deleted inserted replaced
6:4ab077c93b2d 7:685b20d2d3ab
     1 # -*- coding: utf-8 -*-
       
     2 #
       
     3 # PgManager - manage database connections
       
     4 #
       
     5 # Requires: Python 2.6, psycopg2
       
     6 #
       
     7 # Copyright (c) 2010, 2011  Radek Brich <radek.brich@devl.cz>
       
     8 #
       
     9 # Permission is hereby granted, free of charge, to any person obtaining a copy
       
    10 # of this software and associated documentation files (the "Software"), to deal
       
    11 # in the Software without restriction, including without limitation the rights
       
    12 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
       
    13 # copies of the Software, and to permit persons to whom the Software is
       
    14 # furnished to do so, subject to the following conditions:
       
    15 #
       
    16 # The above copyright notice and this permission notice shall be included in
       
    17 # all copies or substantial portions of the Software.
       
    18 #
       
    19 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
       
    20 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
       
    21 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
       
    22 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
       
    23 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
       
    24 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
       
    25 # THE SOFTWARE.
       
    26 
       
    27 """Postgres database connection manager
       
    28 
       
    29 PgManager wraps psycopg2 connect function, adding following features:
       
    30 
       
    31  * Manage database connection parameters - link connection parameters
       
    32    to an unique identifier, retrieve connection object by this identifier
       
    33 
       
    34  * Connection pooling - connections with same identifier are pooled and reused
       
    35 
       
    36  * Easy query using the with statement - retrieve cursor directly by connection
       
    37    identifier, don't worry about connections
       
    38 
       
    39  * Dict rows - cursor has additional methods like fetchall_dict(), which
       
    40    returns dict row instead of ordinary list-like row
       
    41 
       
    42 Example:
       
    43 
       
    44 import pgmanager
       
    45 
       
    46 pgm = pgmanager.get_instance()
       
    47 pgm.create_conn(hostaddr='127.0.0.1', dbname='postgres')
       
    48 
       
    49 with pgm.cursor() as curs:
       
    50     curs.execute('SELECT now() AS now')
       
    51     row = curs.fetchone_dict()
       
    52     print row.now
       
    53 
       
    54 First, we have obtained PgManager instance. This is like calling
       
    55 PgManager(), although in our example the instance is global. That means
       
    56 getting the instance in another module brings us all the defined connections
       
    57 etc.
       
    58 
       
    59 On second line we created connection named 'default' (this name can be left out).
       
    60 The with statement obtains connection (actually connects to database when needed),
       
    61 then returns cursor for this connection. On exit, the connection is returned
       
    62 to the pool or closed (depending on number of connections on pool and setting
       
    63 of keep_open parameter).
       
    64 
       
    65 The row returned by fetchone_dict() is special dict object, which can be accessed
       
    66 using item or attribute access, that is row['now'] or row.now.
       
    67 """
       
    68 
       
    69 from contextlib import contextmanager
       
    70 import logging
       
    71 import threading
       
    72 import select
       
    73 
       
    74 import psycopg2
       
    75 import psycopg2.extensions
       
    76 
       
    77 from psycopg2 import DatabaseError, IntegrityError
       
    78 
       
    79 
       
    80 class PgManagerError(Exception):
       
    81 
       
    82     pass
       
    83 
       
    84 
       
    85 class ConnectionInfo:
       
    86 
       
    87     def __init__(self, dsn, isolation_level=None, init_statement=None, keep_open=1):
       
    88         self.dsn = dsn
       
    89         self.isolation_level = isolation_level
       
    90         self.init_statement = init_statement
       
    91         self.keep_open = keep_open
       
    92 
       
    93 
       
    94 class RowDict(dict):
       
    95 
       
    96     def __getattr__(self, key):
       
    97         return self[key]
       
    98 
       
    99 
       
   100 class Cursor(psycopg2.extensions.cursor):
       
   101 
       
   102     def execute(self, query, args=None):
       
   103         try:
       
   104             return super(Cursor, self).execute(query, args)
       
   105         finally:
       
   106             log.debug(self.query.decode('utf8'))
       
   107 
       
   108     def callproc(self, procname, args=None):
       
   109         try:
       
   110             return super(Cursor, self).callproc(procname, args)
       
   111         finally:
       
   112             log.debug(self.query.decode('utf8'))
       
   113 
       
   114     def row_dict(self, row, lstrip=None):
       
   115         adjustname = lambda a: a
       
   116         if lstrip:
       
   117             adjustname = lambda a: a.lstrip(lstrip)
       
   118         return RowDict(zip([adjustname(desc[0]) for desc in self.description], row))
       
   119 
       
   120     def fetchone_dict(self, lstrip=None):
       
   121         row = super(Cursor, self).fetchone()
       
   122         if not row:
       
   123             return row
       
   124         return self.row_dict(row, lstrip)
       
   125 
       
   126     def fetchall_dict(self, lstrip=None):
       
   127         rows = super(Cursor, self).fetchall()
       
   128         return [self.row_dict(row, lstrip) for row in rows]
       
   129 
       
   130 
       
   131 class Connection(psycopg2.extensions.connection):
       
   132 
       
   133     def cursor(self, name=None):
       
   134         if name is None:
       
   135             return super(Connection, self).cursor(cursor_factory=Cursor)
       
   136         else:
       
   137             return super(Connection, self).cursor(name, cursor_factory=Cursor)
       
   138 
       
   139 
       
   140 class PgManager:
       
   141 
       
   142     def __init__(self):
       
   143         self.conn_known = {}  # available connections
       
   144         self.conn_pool = {}
       
   145         self.lock = threading.Lock()
       
   146 
       
   147     def __del__(self):
       
   148         for conn in tuple(self.conn_known.keys()):
       
   149             self.destroy_conn(conn)
       
   150 
       
   151     def create_conn(self, name='default', isolation_level=None, dsn=None, **kw):
       
   152         '''Create named connection.'''
       
   153         if name in self.conn_known:
       
   154             raise PgManagerError('Connection name "%s" already registered.' % name)
       
   155 
       
   156         if dsn is None:
       
   157             dsn = ' '.join([x[0]+'='+str(x[1]) for x in kw.items()])
       
   158 
       
   159         isolation_level = self._normalize_isolation_level(isolation_level)
       
   160         ci = ConnectionInfo(dsn, isolation_level)
       
   161 
       
   162         self.conn_known[name] = ci
       
   163         self.conn_pool[name] = []
       
   164 
       
   165     def close_conn(self, name='default'):
       
   166         '''Close all connections of given name.
       
   167         
       
   168         Connection credentials are still saved.
       
   169         
       
   170         '''
       
   171         while len(self.conn_pool[name]):
       
   172             conn = self.conn_pool[name].pop()
       
   173             conn.close()
       
   174 
       
   175     def destroy_conn(self, name='default'):
       
   176         '''Destroy connection.
       
   177         
       
   178         Counterpart of create_conn.
       
   179         
       
   180         '''
       
   181         if not name in self.conn_known:
       
   182             raise PgManagerError('Connection name "%s" not registered.' % name)
       
   183 
       
   184         self.close_conn(name)
       
   185 
       
   186         del self.conn_known[name]
       
   187         del self.conn_pool[name]
       
   188 
       
   189     def get_conn(self, name='default'):
       
   190         '''Get connection of name 'name' from pool.'''
       
   191         self.lock.acquire()
       
   192         try:
       
   193             if not name in self.conn_known:
       
   194                 raise PgManagerError("Connection name '%s' not registered." % name)
       
   195 
       
   196             conn = None
       
   197             while len(self.conn_pool[name]) and conn is None:
       
   198                 conn = self.conn_pool[name].pop()
       
   199                 if conn.closed:
       
   200                     conn = None
       
   201 
       
   202             if conn is None:
       
   203                 ci = self.conn_known[name]
       
   204                 conn = psycopg2.connect(ci.dsn, connection_factory=Connection)
       
   205                 if not ci.isolation_level is None:
       
   206                     conn.set_isolation_level(ci.isolation_level)
       
   207                 if ci.init_statement:
       
   208                     curs = conn.cursor()
       
   209                     curs.execute(ci.init_statement)
       
   210                     curs.close()
       
   211         finally:
       
   212             self.lock.release()
       
   213         return conn
       
   214 
       
   215     def put_conn(self, conn, name='default'):
       
   216         '''Put connection back to pool.
       
   217         
       
   218         Name must be same as used for get_conn,
       
   219         otherwise things become broken.
       
   220         
       
   221         '''
       
   222         self.lock.acquire()
       
   223         try:
       
   224             if not name in self.conn_known:
       
   225                 raise PgManagerError("Connection name '%s' not registered." % name)
       
   226 
       
   227             if len(self.conn_pool[name]) >= self.conn_known[name].keep_open:
       
   228                 conn.close()
       
   229                 return
       
   230 
       
   231             if conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN:
       
   232                 conn.close()
       
   233                 return
       
   234 
       
   235             # connection returned to the pool must not be in transaction
       
   236             if conn.get_transaction_status() != psycopg2.extensions.TRANSACTION_STATUS_IDLE:
       
   237                 conn.rollback()
       
   238 
       
   239             self.conn_pool[name].append(conn)
       
   240         finally:
       
   241             self.lock.release()
       
   242 
       
   243     @contextmanager
       
   244     def cursor(self, name='default'):
       
   245         '''Cursor context.
       
   246         
       
   247         Uses any connection of name 'name' from pool
       
   248         and returns cursor for that connection.
       
   249         
       
   250         '''
       
   251         conn = self.get_conn(name)
       
   252 
       
   253         try:
       
   254             curs = conn.cursor()
       
   255             yield curs
       
   256         finally:
       
   257             curs.close()
       
   258             self.put_conn(conn, name)
       
   259 
       
   260     def wait_for_notify(self, name='default', timeout=5):
       
   261         '''Wait for asynchronous notifies, return the last one.
       
   262         
       
   263         Returns None on timeout.
       
   264         
       
   265         '''
       
   266         conn = self.get_conn(name)
       
   267         
       
   268         try:
       
   269             # any residual notify?
       
   270             # then return it, that should not break anything
       
   271             if conn.notifies:
       
   272                 return conn.notifies.pop()
       
   273 
       
   274             if select.select([conn], [], [], timeout) == ([], [], []):
       
   275                 # timeout
       
   276                 return None
       
   277             else:
       
   278                 conn.poll()
       
   279 
       
   280                 # return just the last notify (we do not care for older ones)
       
   281                 if conn.notifies:
       
   282                     return conn.notifies.pop()
       
   283                 return None
       
   284         finally:
       
   285             # clean notifies
       
   286             while conn.notifies:
       
   287                 conn.notifies.pop()
       
   288             self.put_conn(conn, name)
       
   289 
       
   290     def _normalize_isolation_level(self, level):
       
   291         if type(level) == str:
       
   292             if level.lower() == 'autocommit':
       
   293                 return psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
       
   294             if level.lower() == 'read_committed':
       
   295                 return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED
       
   296             if level.lower() == 'serializable':
       
   297                 return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
       
   298             raise PgManagerError('Unknown isolation level name: "%s"', level)
       
   299         return level
       
   300 
       
   301 
       
   302 try:
       
   303     NullHandler = logging.NullHandler
       
   304 except AttributeError:
       
   305     class NullHandler(logging.Handler):
       
   306         def emit(self, record):
       
   307             pass
       
   308 
       
   309 
       
   310 log = logging.getLogger("pgmanager")
       
   311 log.addHandler(NullHandler())
       
   312 
       
   313 
       
   314 instance = None
       
   315 
       
   316 
       
   317 def get_instance():
       
   318     global instance
       
   319     if instance is None:
       
   320         instance = PgManager()
       
   321     return instance
       
   322 
       
   323