pgtoolkit/pgmanager.py
changeset 9 2fcc8ef0b97d
parent 8 2911935c524d
child 19 e526ca146fa9
equal deleted inserted replaced
8:2911935c524d 9:2fcc8ef0b97d
       
     1 # -*- coding: utf-8 -*-
       
     2 #
       
     3 # PgManager - manage database connections
       
     4 #
       
     5 # Requires: Python 2.6, psycopg2
       
     6 #
       
     7 # Part of pgtoolkit
       
     8 # http://hg.devl.cz/pgtoolkit
       
     9 #
       
    10 # Copyright (c) 2010, 2011  Radek Brich <radek.brich@devl.cz>
       
    11 #
       
    12 # Permission is hereby granted, free of charge, to any person obtaining a copy
       
    13 # of this software and associated documentation files (the "Software"), to deal
       
    14 # in the Software without restriction, including without limitation the rights
       
    15 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
       
    16 # copies of the Software, and to permit persons to whom the Software is
       
    17 # furnished to do so, subject to the following conditions:
       
    18 #
       
    19 # The above copyright notice and this permission notice shall be included in
       
    20 # all copies or substantial portions of the Software.
       
    21 #
       
    22 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
       
    23 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
       
    24 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
       
    25 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
       
    26 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
       
    27 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
       
    28 # THE SOFTWARE.
       
    29 
       
    30 """Postgres database connection manager
       
    31 
       
    32 PgManager wraps psycopg2 connect function, adding following features:
       
    33 
       
    34  * Manage database connection parameters - link connection parameters
       
    35    to an unique identifier, retrieve connection object by this identifier
       
    36 
       
    37  * Connection pooling - connections with same identifier are pooled and reused
       
    38 
       
    39  * Easy query using the with statement - retrieve cursor directly by connection
       
    40    identifier, don't worry about connections
       
    41 
       
    42  * Dict rows - cursor has additional methods like fetchall_dict(), which
       
    43    returns dict row instead of ordinary list-like row
       
    44 
       
    45 Example:
       
    46 
       
    47 import pgmanager
       
    48 
       
    49 pgm = pgmanager.get_instance()
       
    50 pgm.create_conn(hostaddr='127.0.0.1', dbname='postgres')
       
    51 
       
    52 with pgm.cursor() as curs:
       
    53     curs.execute('SELECT now() AS now')
       
    54     row = curs.fetchone_dict()
       
    55     print row.now
       
    56 
       
    57 First, we have obtained PgManager instance. This is like calling
       
    58 PgManager(), although in our example the instance is global. That means
       
    59 getting the instance in another module brings us all the defined connections
       
    60 etc.
       
    61 
       
    62 On second line we have created connection named 'default' (this name can be left out).
       
    63 The with statement obtains connection (actually connects to database when needed),
       
    64 then returns cursor for this connection. At the end of with statement,
       
    65 the connection is returned to the pool or closed (depending on number of connections
       
    66 in pool and on setting of keep_open parameter).
       
    67 
       
    68 The row returned by fetchone_dict() is special dict object, which can be accessed
       
    69 using item or attribute access, that is row['now'] or row.now.
       
    70 """
       
    71 
       
    72 from contextlib import contextmanager
       
    73 import logging
       
    74 import threading
       
    75 import select
       
    76 import socket
       
    77 
       
    78 import psycopg2
       
    79 import psycopg2.extensions
       
    80 
       
    81 from psycopg2 import DatabaseError, IntegrityError
       
    82 
       
    83 
       
    84 class PgManagerError(Exception):
       
    85 
       
    86     pass
       
    87 
       
    88 
       
    89 class ConnectionInfo:
       
    90 
       
    91     def __init__(self, dsn, isolation_level=None, init_statement=None, keep_open=1):
       
    92         self.dsn = dsn
       
    93         self.isolation_level = isolation_level
       
    94         self.init_statement = init_statement
       
    95         self.keep_open = keep_open
       
    96 
       
    97 
       
    98 class RowDict(dict):
       
    99 
       
   100     def __getattr__(self, key):
       
   101         return self[key]
       
   102 
       
   103 
       
   104 class Cursor(psycopg2.extensions.cursor):
       
   105 
       
   106     def execute(self, query, args=None):
       
   107         try:
       
   108             return super(Cursor, self).execute(query, args)
       
   109         finally:
       
   110             log.debug(self.query.decode('utf8'))
       
   111 
       
   112     def callproc(self, procname, args=None):
       
   113         try:
       
   114             return super(Cursor, self).callproc(procname, args)
       
   115         finally:
       
   116             log.debug(self.query.decode('utf8'))
       
   117 
       
   118     def row_dict(self, row, lstrip=None):
       
   119         adjustname = lambda a: a
       
   120         if lstrip:
       
   121             adjustname = lambda a: a.lstrip(lstrip)
       
   122         return RowDict(zip([adjustname(desc[0]) for desc in self.description], row))
       
   123 
       
   124     def fetchone_dict(self, lstrip=None):
       
   125         row = super(Cursor, self).fetchone()
       
   126         if row is None:
       
   127             return None
       
   128         return self.row_dict(row, lstrip)
       
   129 
       
   130     def fetchall_dict(self, lstrip=None):
       
   131         rows = super(Cursor, self).fetchall()
       
   132         return [self.row_dict(row, lstrip) for row in rows]
       
   133 
       
   134     def fetchone_adapted(self):
       
   135         '''Like fetchone() but values are quoted for direct inclusion in SQL query.
       
   136         
       
   137         This is useful when you need to generate SQL script from data returned
       
   138         by the query. Use mogrify() for simple cases.
       
   139         
       
   140         '''
       
   141         row = super(Cursor, self).fetchone()
       
   142         if row is None:
       
   143             return None
       
   144         return [self.mogrify('%s', [x]).decode('utf8') for x in row]
       
   145 
       
   146     def fetchall_adapted(self):
       
   147         '''Like fetchall() but values are quoted for direct inclusion in SQL query.'''
       
   148         rows = super(Cursor, self).fetchall()
       
   149         return [[self.mogrify('%s', [x]).decode('utf8') for x in row] for row in rows]
       
   150 
       
   151 
       
   152 class Connection(psycopg2.extensions.connection):
       
   153 
       
   154     def cursor(self, name=None):
       
   155         if name is None:
       
   156             return super(Connection, self).cursor(cursor_factory=Cursor)
       
   157         else:
       
   158             return super(Connection, self).cursor(name, cursor_factory=Cursor)
       
   159 
       
   160     def keep_alive(self):
       
   161         '''Set socket to keepalive mode. Must be called before any query.'''
       
   162         sock = socket.fromfd(self.fileno(), socket.AF_INET, socket.SOCK_STREAM) 
       
   163         sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
       
   164         # Maximum keep-alive probes before asuming the connection is lost
       
   165         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
       
   166         # Interval (in seconds) between keep-alive probes
       
   167         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 2)
       
   168         # Maximum idle time (in seconds) before start sending keep-alive probes
       
   169         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10)
       
   170 
       
   171 
       
   172 class PgManager:
       
   173 
       
   174     def __init__(self):
       
   175         self.conn_known = {}  # available connections
       
   176         self.conn_pool = {}
       
   177         self.lock = threading.Lock()
       
   178 
       
   179     def __del__(self):
       
   180         for conn in tuple(self.conn_known.keys()):
       
   181             self.destroy_conn(conn)
       
   182 
       
   183     def create_conn(self, name='default', isolation_level=None, dsn=None, **kw):
       
   184         '''Create named connection.'''
       
   185         if name in self.conn_known:
       
   186             raise PgManagerError('Connection name "%s" already registered.' % name)
       
   187 
       
   188         if dsn is None:
       
   189             dsn = ' '.join([x[0]+'='+str(x[1]) for x in kw.items()])
       
   190 
       
   191         isolation_level = self._normalize_isolation_level(isolation_level)
       
   192         ci = ConnectionInfo(dsn, isolation_level)
       
   193 
       
   194         self.conn_known[name] = ci
       
   195         self.conn_pool[name] = []
       
   196 
       
   197     def close_conn(self, name='default'):
       
   198         '''Close all connections of given name.
       
   199         
       
   200         Connection credentials are still saved.
       
   201         
       
   202         '''
       
   203         while len(self.conn_pool[name]):
       
   204             conn = self.conn_pool[name].pop()
       
   205             conn.close()
       
   206 
       
   207     def destroy_conn(self, name='default'):
       
   208         '''Destroy connection.
       
   209         
       
   210         Counterpart of create_conn.
       
   211         
       
   212         '''
       
   213         if not name in self.conn_known:
       
   214             raise PgManagerError('Connection name "%s" not registered.' % name)
       
   215 
       
   216         self.close_conn(name)
       
   217 
       
   218         del self.conn_known[name]
       
   219         del self.conn_pool[name]
       
   220 
       
   221     def get_conn(self, name='default'):
       
   222         '''Get connection of name 'name' from pool.'''
       
   223         self.lock.acquire()
       
   224         try:
       
   225             if not name in self.conn_known:
       
   226                 raise PgManagerError("Connection name '%s' not registered." % name)
       
   227 
       
   228             conn = None
       
   229             while len(self.conn_pool[name]) and conn is None:
       
   230                 conn = self.conn_pool[name].pop()
       
   231                 if conn.closed:
       
   232                     conn = None
       
   233 
       
   234             if conn is None:
       
   235                 ci = self.conn_known[name]
       
   236                 conn = self._connect(ci)
       
   237         finally:
       
   238             self.lock.release()
       
   239         return conn
       
   240 
       
   241     def put_conn(self, conn, name='default'):
       
   242         '''Put connection back to pool.
       
   243         
       
   244         Name must be same as used for get_conn,
       
   245         otherwise things become broken.
       
   246         
       
   247         '''
       
   248         self.lock.acquire()
       
   249         try:
       
   250             if not name in self.conn_known:
       
   251                 raise PgManagerError("Connection name '%s' not registered." % name)
       
   252 
       
   253             if len(self.conn_pool[name]) >= self.conn_known[name].keep_open:
       
   254                 conn.close()
       
   255                 return
       
   256 
       
   257             if conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN:
       
   258                 conn.close()
       
   259                 return
       
   260 
       
   261             # connection returned to the pool must not be in transaction
       
   262             if conn.get_transaction_status() != psycopg2.extensions.TRANSACTION_STATUS_IDLE:
       
   263                 conn.rollback()
       
   264 
       
   265             self.conn_pool[name].append(conn)
       
   266         finally:
       
   267             self.lock.release()
       
   268 
       
   269     @contextmanager
       
   270     def cursor(self, name='default'):
       
   271         '''Cursor context.
       
   272         
       
   273         Uses any connection of name 'name' from pool
       
   274         and returns cursor for that connection.
       
   275         
       
   276         '''
       
   277         conn = self.get_conn(name)
       
   278 
       
   279         try:
       
   280             curs = conn.cursor()
       
   281             yield curs
       
   282         finally:
       
   283             curs.close()
       
   284             self.put_conn(conn, name)
       
   285 
       
   286     def wait_for_notify(self, name='default', timeout=5):
       
   287         '''Wait for asynchronous notifies, return the last one.
       
   288         
       
   289         Returns None on timeout.
       
   290         
       
   291         '''
       
   292         conn = self.get_conn(name)
       
   293         
       
   294         try:
       
   295             # any residual notify?
       
   296             # then return it, that should not break anything
       
   297             if conn.notifies:
       
   298                 return conn.notifies.pop()
       
   299 
       
   300             if select.select([conn], [], [], timeout) == ([], [], []):
       
   301                 # timeout
       
   302                 return None
       
   303             else:
       
   304                 conn.poll()
       
   305 
       
   306                 # return just the last notify (we do not care for older ones)
       
   307                 if conn.notifies:
       
   308                     return conn.notifies.pop()
       
   309                 return None
       
   310         finally:
       
   311             # clean notifies
       
   312             while conn.notifies:
       
   313                 conn.notifies.pop()
       
   314             self.put_conn(conn, name)
       
   315 
       
   316     def _connect(self, ci):
       
   317         conn = psycopg2.connect(ci.dsn, connection_factory=Connection)
       
   318         conn.keep_alive()
       
   319         if not ci.isolation_level is None:
       
   320             conn.set_isolation_level(ci.isolation_level)
       
   321         if ci.init_statement:
       
   322             curs = conn.cursor()
       
   323             curs.execute(ci.init_statement)
       
   324             curs.close()
       
   325         return conn
       
   326 
       
   327     def _normalize_isolation_level(self, level):
       
   328         if type(level) == str:
       
   329             if level.lower() == 'autocommit':
       
   330                 return psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
       
   331             if level.lower() == 'read_committed':
       
   332                 return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED
       
   333             if level.lower() == 'serializable':
       
   334                 return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
       
   335             raise PgManagerError('Unknown isolation level name: "%s"', level)
       
   336         return level
       
   337 
       
   338 
       
   339 try:
       
   340     NullHandler = logging.NullHandler
       
   341 except AttributeError:
       
   342     class NullHandler(logging.Handler):
       
   343         def emit(self, record):
       
   344             pass
       
   345 
       
   346 
       
   347 log = logging.getLogger("pgmanager")
       
   348 log.addHandler(NullHandler())
       
   349 
       
   350 
       
   351 instance = None
       
   352 
       
   353 
       
   354 def get_instance():
       
   355     global instance
       
   356     if instance is None:
       
   357         instance = PgManager()
       
   358     return instance
       
   359 
       
   360