tools/pgmanager.py
changeset 9 2fcc8ef0b97d
parent 8 2911935c524d
child 10 f3a1b9792cc9
equal deleted inserted replaced
8:2911935c524d 9:2fcc8ef0b97d
     1 # -*- coding: utf-8 -*-
       
     2 #
       
     3 # PgManager - manage database connections
       
     4 #
       
     5 # Requires: Python 2.6, psycopg2
       
     6 #
       
     7 # Copyright (c) 2010, 2011  Radek Brich <radek.brich@devl.cz>
       
     8 #
       
     9 # Permission is hereby granted, free of charge, to any person obtaining a copy
       
    10 # of this software and associated documentation files (the "Software"), to deal
       
    11 # in the Software without restriction, including without limitation the rights
       
    12 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
       
    13 # copies of the Software, and to permit persons to whom the Software is
       
    14 # furnished to do so, subject to the following conditions:
       
    15 #
       
    16 # The above copyright notice and this permission notice shall be included in
       
    17 # all copies or substantial portions of the Software.
       
    18 #
       
    19 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
       
    20 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
       
    21 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
       
    22 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
       
    23 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
       
    24 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
       
    25 # THE SOFTWARE.
       
    26 
       
    27 """Postgres database connection manager
       
    28 
       
    29 PgManager wraps psycopg2 connect function, adding following features:
       
    30 
       
    31  * Manage database connection parameters - link connection parameters
       
    32    to an unique identifier, retrieve connection object by this identifier
       
    33 
       
    34  * Connection pooling - connections with same identifier are pooled and reused
       
    35 
       
    36  * Easy query using the with statement - retrieve cursor directly by connection
       
    37    identifier, don't worry about connections
       
    38 
       
    39  * Dict rows - cursor has additional methods like fetchall_dict(), which
       
    40    returns dict row instead of ordinary list-like row
       
    41 
       
    42 Example:
       
    43 
       
    44 import pgmanager
       
    45 
       
    46 pgm = pgmanager.get_instance()
       
    47 pgm.create_conn(hostaddr='127.0.0.1', dbname='postgres')
       
    48 
       
    49 with pgm.cursor() as curs:
       
    50     curs.execute('SELECT now() AS now')
       
    51     row = curs.fetchone_dict()
       
    52     print row.now
       
    53 
       
    54 First, we have obtained PgManager instance. This is like calling
       
    55 PgManager(), although in our example the instance is global. That means
       
    56 getting the instance in another module brings us all the defined connections
       
    57 etc.
       
    58 
       
    59 On second line we created connection named 'default' (this name can be left out).
       
    60 The with statement obtains connection (actually connects to database when needed),
       
    61 then returns cursor for this connection. On exit, the connection is returned
       
    62 to the pool or closed (depending on number of connections on pool and setting
       
    63 of keep_open parameter).
       
    64 
       
    65 The row returned by fetchone_dict() is special dict object, which can be accessed
       
    66 using item or attribute access, that is row['now'] or row.now.
       
    67 """
       
    68 
       
    69 from contextlib import contextmanager
       
    70 import logging
       
    71 import threading
       
    72 import select
       
    73 import socket
       
    74 
       
    75 import psycopg2
       
    76 import psycopg2.extensions
       
    77 
       
    78 from psycopg2 import DatabaseError, IntegrityError
       
    79 
       
    80 
       
    81 class PgManagerError(Exception):
       
    82 
       
    83     pass
       
    84 
       
    85 
       
    86 class ConnectionInfo:
       
    87 
       
    88     def __init__(self, dsn, isolation_level=None, init_statement=None, keep_open=1):
       
    89         self.dsn = dsn
       
    90         self.isolation_level = isolation_level
       
    91         self.init_statement = init_statement
       
    92         self.keep_open = keep_open
       
    93 
       
    94 
       
    95 class RowDict(dict):
       
    96 
       
    97     def __getattr__(self, key):
       
    98         return self[key]
       
    99 
       
   100 
       
   101 class Cursor(psycopg2.extensions.cursor):
       
   102 
       
   103     def execute(self, query, args=None):
       
   104         try:
       
   105             return super(Cursor, self).execute(query, args)
       
   106         finally:
       
   107             log.debug(self.query.decode('utf8'))
       
   108 
       
   109     def callproc(self, procname, args=None):
       
   110         try:
       
   111             return super(Cursor, self).callproc(procname, args)
       
   112         finally:
       
   113             log.debug(self.query.decode('utf8'))
       
   114 
       
   115     def row_dict(self, row, lstrip=None):
       
   116         adjustname = lambda a: a
       
   117         if lstrip:
       
   118             adjustname = lambda a: a.lstrip(lstrip)
       
   119         return RowDict(zip([adjustname(desc[0]) for desc in self.description], row))
       
   120 
       
   121     def fetchone_dict(self, lstrip=None):
       
   122         row = super(Cursor, self).fetchone()
       
   123         if row is None:
       
   124             return None
       
   125         return self.row_dict(row, lstrip)
       
   126 
       
   127     def fetchall_dict(self, lstrip=None):
       
   128         rows = super(Cursor, self).fetchall()
       
   129         return [self.row_dict(row, lstrip) for row in rows]
       
   130 
       
   131     def fetchone_adapted(self):
       
   132         '''Like fetchone() but values are quoted for direct inclusion in SQL query.
       
   133         
       
   134         This is useful when you need to generate SQL script from data returned
       
   135         by the query. Use mogrify() for simple cases.
       
   136         
       
   137         '''
       
   138         row = super(Cursor, self).fetchone()
       
   139         if row is None:
       
   140             return None
       
   141         return [self.mogrify('%s', [x]).decode('utf8') for x in row]
       
   142 
       
   143     def fetchall_adapted(self):
       
   144         '''Like fetchall() but values are quoted for direct inclusion in SQL query.'''
       
   145         rows = super(Cursor, self).fetchall()
       
   146         return [[self.mogrify('%s', [x]).decode('utf8') for x in row] for row in rows]
       
   147 
       
   148 
       
   149 class Connection(psycopg2.extensions.connection):
       
   150 
       
   151     def cursor(self, name=None):
       
   152         if name is None:
       
   153             return super(Connection, self).cursor(cursor_factory=Cursor)
       
   154         else:
       
   155             return super(Connection, self).cursor(name, cursor_factory=Cursor)
       
   156 
       
   157     def keep_alive(self):
       
   158         '''Set socket to keepalive mode. Must be called before any query.'''
       
   159         sock = socket.fromfd(self.fileno(), socket.AF_INET, socket.SOCK_STREAM) 
       
   160         sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
       
   161         # Maximum keep-alive probes before asuming the connection is lost
       
   162         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
       
   163         # Interval (in seconds) between keep-alive probes
       
   164         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 2)
       
   165         # Maximum idle time (in seconds) before start sending keep-alive probes
       
   166         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10)
       
   167 
       
   168 
       
   169 class PgManager:
       
   170 
       
   171     def __init__(self):
       
   172         self.conn_known = {}  # available connections
       
   173         self.conn_pool = {}
       
   174         self.lock = threading.Lock()
       
   175 
       
   176     def __del__(self):
       
   177         for conn in tuple(self.conn_known.keys()):
       
   178             self.destroy_conn(conn)
       
   179 
       
   180     def create_conn(self, name='default', isolation_level=None, dsn=None, **kw):
       
   181         '''Create named connection.'''
       
   182         if name in self.conn_known:
       
   183             raise PgManagerError('Connection name "%s" already registered.' % name)
       
   184 
       
   185         if dsn is None:
       
   186             dsn = ' '.join([x[0]+'='+str(x[1]) for x in kw.items()])
       
   187 
       
   188         isolation_level = self._normalize_isolation_level(isolation_level)
       
   189         ci = ConnectionInfo(dsn, isolation_level)
       
   190 
       
   191         self.conn_known[name] = ci
       
   192         self.conn_pool[name] = []
       
   193 
       
   194     def close_conn(self, name='default'):
       
   195         '''Close all connections of given name.
       
   196         
       
   197         Connection credentials are still saved.
       
   198         
       
   199         '''
       
   200         while len(self.conn_pool[name]):
       
   201             conn = self.conn_pool[name].pop()
       
   202             conn.close()
       
   203 
       
   204     def destroy_conn(self, name='default'):
       
   205         '''Destroy connection.
       
   206         
       
   207         Counterpart of create_conn.
       
   208         
       
   209         '''
       
   210         if not name in self.conn_known:
       
   211             raise PgManagerError('Connection name "%s" not registered.' % name)
       
   212 
       
   213         self.close_conn(name)
       
   214 
       
   215         del self.conn_known[name]
       
   216         del self.conn_pool[name]
       
   217 
       
   218     def get_conn(self, name='default'):
       
   219         '''Get connection of name 'name' from pool.'''
       
   220         self.lock.acquire()
       
   221         try:
       
   222             if not name in self.conn_known:
       
   223                 raise PgManagerError("Connection name '%s' not registered." % name)
       
   224 
       
   225             conn = None
       
   226             while len(self.conn_pool[name]) and conn is None:
       
   227                 conn = self.conn_pool[name].pop()
       
   228                 if conn.closed:
       
   229                     conn = None
       
   230 
       
   231             if conn is None:
       
   232                 ci = self.conn_known[name]
       
   233                 conn = self._connect(ci)
       
   234         finally:
       
   235             self.lock.release()
       
   236         return conn
       
   237 
       
   238     def put_conn(self, conn, name='default'):
       
   239         '''Put connection back to pool.
       
   240         
       
   241         Name must be same as used for get_conn,
       
   242         otherwise things become broken.
       
   243         
       
   244         '''
       
   245         self.lock.acquire()
       
   246         try:
       
   247             if not name in self.conn_known:
       
   248                 raise PgManagerError("Connection name '%s' not registered." % name)
       
   249 
       
   250             if len(self.conn_pool[name]) >= self.conn_known[name].keep_open:
       
   251                 conn.close()
       
   252                 return
       
   253 
       
   254             if conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN:
       
   255                 conn.close()
       
   256                 return
       
   257 
       
   258             # connection returned to the pool must not be in transaction
       
   259             if conn.get_transaction_status() != psycopg2.extensions.TRANSACTION_STATUS_IDLE:
       
   260                 conn.rollback()
       
   261 
       
   262             self.conn_pool[name].append(conn)
       
   263         finally:
       
   264             self.lock.release()
       
   265 
       
   266     @contextmanager
       
   267     def cursor(self, name='default'):
       
   268         '''Cursor context.
       
   269         
       
   270         Uses any connection of name 'name' from pool
       
   271         and returns cursor for that connection.
       
   272         
       
   273         '''
       
   274         conn = self.get_conn(name)
       
   275 
       
   276         try:
       
   277             curs = conn.cursor()
       
   278             yield curs
       
   279         finally:
       
   280             curs.close()
       
   281             self.put_conn(conn, name)
       
   282 
       
   283     def wait_for_notify(self, name='default', timeout=5):
       
   284         '''Wait for asynchronous notifies, return the last one.
       
   285         
       
   286         Returns None on timeout.
       
   287         
       
   288         '''
       
   289         conn = self.get_conn(name)
       
   290         
       
   291         try:
       
   292             # any residual notify?
       
   293             # then return it, that should not break anything
       
   294             if conn.notifies:
       
   295                 return conn.notifies.pop()
       
   296 
       
   297             if select.select([conn], [], [], timeout) == ([], [], []):
       
   298                 # timeout
       
   299                 return None
       
   300             else:
       
   301                 conn.poll()
       
   302 
       
   303                 # return just the last notify (we do not care for older ones)
       
   304                 if conn.notifies:
       
   305                     return conn.notifies.pop()
       
   306                 return None
       
   307         finally:
       
   308             # clean notifies
       
   309             while conn.notifies:
       
   310                 conn.notifies.pop()
       
   311             self.put_conn(conn, name)
       
   312 
       
   313     def _connect(self, ci):
       
   314         conn = psycopg2.connect(ci.dsn, connection_factory=Connection)
       
   315         conn.keep_alive()
       
   316         if not ci.isolation_level is None:
       
   317             conn.set_isolation_level(ci.isolation_level)
       
   318         if ci.init_statement:
       
   319             curs = conn.cursor()
       
   320             curs.execute(ci.init_statement)
       
   321             curs.close()
       
   322         return conn
       
   323 
       
   324     def _normalize_isolation_level(self, level):
       
   325         if type(level) == str:
       
   326             if level.lower() == 'autocommit':
       
   327                 return psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
       
   328             if level.lower() == 'read_committed':
       
   329                 return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED
       
   330             if level.lower() == 'serializable':
       
   331                 return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
       
   332             raise PgManagerError('Unknown isolation level name: "%s"', level)
       
   333         return level
       
   334 
       
   335 
       
   336 try:
       
   337     NullHandler = logging.NullHandler
       
   338 except AttributeError:
       
   339     class NullHandler(logging.Handler):
       
   340         def emit(self, record):
       
   341             pass
       
   342 
       
   343 
       
   344 log = logging.getLogger("pgmanager")
       
   345 log.addHandler(NullHandler())
       
   346 
       
   347 
       
   348 instance = None
       
   349 
       
   350 
       
   351 def get_instance():
       
   352     global instance
       
   353     if instance is None:
       
   354         instance = PgManager()
       
   355     return instance
       
   356 
       
   357