pgtoolkit/pgmanager.py
changeset 36 e67101c22e83
parent 33 bd0beda49bcb
child 37 5b0eb4b11940
equal deleted inserted replaced
35:e7f79c4a27ce 36:e67101c22e83
     5 # Requires: Python 3.2, psycopg2
     5 # Requires: Python 3.2, psycopg2
     6 #
     6 #
     7 # Part of pgtoolkit
     7 # Part of pgtoolkit
     8 # http://hg.devl.cz/pgtoolkit
     8 # http://hg.devl.cz/pgtoolkit
     9 #
     9 #
    10 # Copyright (c) 2010, 2011  Radek Brich <radek.brich@devl.cz>
    10 # Copyright (c) 2010, 2011, 2012  Radek Brich <radek.brich@devl.cz>
    11 #
    11 #
    12 # Permission is hereby granted, free of charge, to any person obtaining a copy
    12 # Permission is hereby granted, free of charge, to any person obtaining a copy
    13 # of this software and associated documentation files (the "Software"), to deal
    13 # of this software and associated documentation files (the "Software"), to deal
    14 # in the Software without restriction, including without limitation the rights
    14 # in the Software without restriction, including without limitation the rights
    15 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    15 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    61 
    61 
    62 On second line we have created connection named 'default' (this name can be left out).
    62 On second line we have created connection named 'default' (this name can be left out).
    63 The with statement obtains connection (actually connects to database when needed),
    63 The with statement obtains connection (actually connects to database when needed),
    64 then returns cursor for this connection. At the end of with statement,
    64 then returns cursor for this connection. At the end of with statement,
    65 the connection is returned to the pool or closed (depending on number of connections
    65 the connection is returned to the pool or closed (depending on number of connections
    66 in pool and on setting of keep_open parameter).
    66 in pool and on setting of pool_size parameter).
    67 
    67 
    68 The row returned by fetchone_dict() is special dict object, which can be accessed
    68 The row returned by fetchone_dict() is special dict object, which can be accessed
    69 using item or attribute access, that is row['now'] or row.now.
    69 using item or attribute access, that is row['now'] or row.now.
    70 """
    70 """
    71 
    71 
    92     pass
    92     pass
    93 
    93 
    94 
    94 
    95 class ConnectionInfo:
    95 class ConnectionInfo:
    96 
    96 
    97     def __init__(self, dsn, isolation_level=None, keep_alive=True,
    97     def __init__(self, name, dsn, isolation_level=None, keep_alive=True,
    98         init_statement=None, keep_open=1):
    98                  init_statement=None, pool_size=1):
    99         
    99         self.name = name  # connection name is logged with SQL queries
   100         self.dsn = dsn
   100         self.dsn = dsn
   101         self.isolation_level = isolation_level
   101         self.isolation_level = isolation_level
   102         self.keep_alive = keep_alive
   102         self.keep_alive = keep_alive
   103         self.init_statement = init_statement
   103         self.init_statement = init_statement
   104         self.keep_open = keep_open
   104         self.pool_size = pool_size
   105 
   105 
   106 
   106 
   107 class RowDict(OrderedDict):
   107 class RowDict(OrderedDict):
   108 
   108 
   109     def __getattr__(self, key):
   109     def __getattr__(self, key):
   114 
   114 
   115 
   115 
   116 class Cursor(psycopg2.extensions.cursor):
   116 class Cursor(psycopg2.extensions.cursor):
   117 
   117 
   118     def execute(self, query, args=None):
   118     def execute(self, query, args=None):
       
   119         self._log_query(query, args)
   119         try:
   120         try:
   120             return super(Cursor, self).execute(query, args)
   121             return super(Cursor, self).execute(query, args)
   121         finally:
   122         except DatabaseError:
   122             self._log_query()
   123             self._log_exception()
   123 
   124 
   124     def callproc(self, procname, args=None):
   125     def callproc(self, procname, args=None):
       
   126         self._log_query('CALL %s(%s)' % (procname, ','.join(args)))
   125         try:
   127         try:
   126             return super(Cursor, self).callproc(procname, args)
   128             return super(Cursor, self).callproc(procname, args)
   127         finally:
   129         except DatabaseError:
   128             self._log_query()
   130             self._log_exception()
   129 
   131 
   130     def row_dict(self, row, lstrip=None):
   132     def row_dict(self, row, lstrip=None):
   131         adjustname = lambda a: a
   133         adjustname = lambda a: a
   132         if lstrip:
   134         if lstrip:
   133             adjustname = lambda a: a.lstrip(lstrip)
   135             adjustname = lambda a: a.lstrip(lstrip)
   160     def fetchall_adapted(self, lstrip=None):
   162     def fetchall_adapted(self, lstrip=None):
   161         '''Like fetchall_dict() but values are quoted for direct inclusion in SQL query.'''
   163         '''Like fetchall_dict() but values are quoted for direct inclusion in SQL query.'''
   162         rows = super(Cursor, self).fetchall()
   164         rows = super(Cursor, self).fetchall()
   163         return [self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip) for row in rows]
   165         return [self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip) for row in rows]
   164 
   166 
   165     def _log_query(self):
   167     def _log_query(self, query, args=None):
   166         if self.query:
   168         name = self.connection.name if hasattr(self.connection, 'name') else '-'
   167             name = self.connection.name if hasattr(self.connection, 'name') else '-'
   169         log_sql.info('[%s] %s' % (name, self.mogrify(query, args).decode('utf8')))
   168             log_sql.info('[%s] %s' % (name, self.query.decode('utf8')))
   170 
       
   171     def _log_exception(self):
       
   172         name = self.connection.name if hasattr(self.connection, 'name') else '-'
       
   173         log_sql.exception('[%s] exception:' % (name,))
   169 
   174 
   170 class Connection(psycopg2.extensions.connection):
   175 class Connection(psycopg2.extensions.connection):
   171 
   176 
   172     def cursor(self, name=None):
   177     def cursor(self, name=None):
   173         if name is None:
   178         if name is None:
   200 
   205 
   201     def __del__(self):
   206     def __del__(self):
   202         for conn in tuple(self.conn_known.keys()):
   207         for conn in tuple(self.conn_known.keys()):
   203             self.destroy_conn(conn)
   208             self.destroy_conn(conn)
   204 
   209 
   205     def create_conn(self, name='default', keep_open=1, isolation_level=None, keep_alive=True, dsn=None, **kw):
   210     def create_conn(self, name='default', isolation_level=None, keep_alive=True, init_statement=None,
       
   211                     pool_size=1, dsn=None, **kwargs):
   206         '''Create named connection.
   212         '''Create named connection.
   207         
   213         
   208         name -- name for connection (default is "default")
   214         name -- name for connection (default is "default")
   209         keep_open -- how many connections will be kept open in pool (more connections will still be created,
   215         pool_size -- how many connections will be kept open in pool
   210                      but they will be closed by put_conn)
   216                      (more connections will still be created but they will be closed by put_conn)
       
   217                      None - disable pool, always return same connection
   211         isolation_level -- "autocommit", "read_committed", "serializable" or None for driver default
   218         isolation_level -- "autocommit", "read_committed", "serializable" or None for driver default
   212         keep_alive -- set socket to keepalive mode
   219         keep_alive -- set socket to keepalive mode
   213         dsn -- string with connection parameters (dsn means Data Source Name)
   220         dsn -- string with connection parameters (dsn means Data Source Name)
   214         
   221         
   215         Alternative for dsn is keyword args (same names as in dsn).
   222         Alternative for dsn is keyword args (same names as in dsn).
   217         '''
   224         '''
   218         if name in self.conn_known:
   225         if name in self.conn_known:
   219             raise PgManagerError('Connection name "%s" already registered.' % name)
   226             raise PgManagerError('Connection name "%s" already registered.' % name)
   220 
   227 
   221         if dsn is None:
   228         if dsn is None:
   222             dsn = ' '.join([x[0]+'='+str(x[1]) for x in kw.items() if x[1] is not None])
   229             dsn = ' '.join([x[0]+'='+str(x[1]) for x in kwargs.items() if x[1] is not None])
   223 
   230 
   224         isolation_level = self._normalize_isolation_level(isolation_level)
   231         isolation_level = self._normalize_isolation_level(isolation_level)
   225         ci = ConnectionInfo(dsn, isolation_level, keep_alive, keep_open=keep_open)
   232         ci = ConnectionInfo(name, dsn, isolation_level, keep_alive, init_statement, pool_size)
   226 
   233 
   227         self.conn_known[name] = ci
   234         self.conn_known[name] = ci
   228         self.conn_pool[name] = []
   235         self.conn_pool[name] = []
       
   236     
       
   237     def create_conn_listen(self, name, channel, dsn=None, copy_dsn=None, **kwargs):
       
   238         '''Create connection listening for notifies.
       
   239         
       
   240         Disables pool. If you want to use pool, create other connection for that.
       
   241         This connection can be used as usual: conn.cursor() etc.
       
   242         Don't use PgManager's cursor() and put_conn(). 
       
   243         
       
   244         name -- name for connection
       
   245         channel -- listen on this channel
       
   246         copy_dsn -- specify name of other connection and its dsn will be used
       
   247         
       
   248         Other parameters forwarded to create_conn().
       
   249         
       
   250         '''
       
   251         if dsn is None and copy_dsn:
       
   252             try:
       
   253                 dsn = self.conn_known[copy_dsn].dsn
       
   254             except KeyError:
       
   255                 raise PgManagerError("Connection name '%s' not registered." % copy_dsn)
       
   256         listen_query = "LISTEN " + channel
       
   257         self.create_conn(name=name, pool_size=None, isolation_level='autocommit', init_statement=listen_query,
       
   258             dsn=dsn, **kwargs)
   229 
   259 
   230     def close_conn(self, name='default'):
   260     def close_conn(self, name='default'):
   231         '''Close all connections of given name.
   261         '''Close all connections of given name.
   232         
   262         
   233         Connection credentials are still saved.
   263         Connection credentials are still saved.
   254     def get_conn(self, name='default'):
   284     def get_conn(self, name='default'):
   255         '''Get connection of name 'name' from pool.'''
   285         '''Get connection of name 'name' from pool.'''
   256         self._check_fork()
   286         self._check_fork()
   257         self.lock.acquire()
   287         self.lock.acquire()
   258         try:
   288         try:
   259             if not name in self.conn_known:
   289             try:
       
   290                 ci = self.conn_known[name]
       
   291             except KeyError:
   260                 raise PgManagerError("Connection name '%s' not registered." % name)
   292                 raise PgManagerError("Connection name '%s' not registered." % name)
   261 
   293             
   262             conn = None
   294             # no pool, just one static connection
   263             while len(self.conn_pool[name]) and conn is None:
   295             if ci.pool_size is None:
   264                 conn = self.conn_pool[name].pop()
   296                 # check for existing connection
   265                 if conn.closed:
   297                 try:
       
   298                     conn = self.conn_pool[name][0]
       
   299                     if conn.closed:
       
   300                         conn = None
       
   301                 except IndexError:
   266                     conn = None
   302                     conn = None
   267 
   303                     self.conn_pool[name].append(conn)
   268             if conn is None:
   304                 # if no existing connection is valid, connect new one and save it
   269                 ci = self.conn_known[name]
   305                 if conn is None:
   270                 conn = self._connect(ci)
   306                     conn = self._connect(ci)
   271                 # add our name to connection instance (this is then logged with SQL queries)
   307                     self.conn_pool[name][0] = conn
   272                 conn.name = name
   308             
       
   309             # connection from pool
       
   310             else:
       
   311                 conn = None
       
   312                 while len(self.conn_pool[name]) and conn is None:
       
   313                     conn = self.conn_pool[name].pop()
       
   314                     if conn.closed:
       
   315                         conn = None
       
   316                 
       
   317                 if conn is None:
       
   318                     conn = self._connect(ci)
   273         finally:
   319         finally:
   274             self.lock.release()
   320             self.lock.release()
   275         return conn
   321         return conn
   276 
   322 
   277     def put_conn(self, conn, name='default'):
   323     def put_conn(self, conn, name='default'):
   284         self.lock.acquire()
   330         self.lock.acquire()
   285         try:
   331         try:
   286             if not name in self.conn_known:
   332             if not name in self.conn_known:
   287                 raise PgManagerError("Connection name '%s' not registered." % name)
   333                 raise PgManagerError("Connection name '%s' not registered." % name)
   288 
   334 
   289             if len(self.conn_pool[name]) >= self.conn_known[name].keep_open:
   335             if len(self.conn_pool[name]) >= self.conn_known[name].pool_size:
   290                 conn.close()
   336                 conn.close()
   291                 return
   337                 return
   292 
   338 
   293             if conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN:
   339             if conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN:
   294                 conn.close()
   340                 conn.close()
   328     def log_notices(self, conn):
   374     def log_notices(self, conn):
   329         for notice in conn.notices:
   375         for notice in conn.notices:
   330             log_notices.info(notice.rstrip())
   376             log_notices.info(notice.rstrip())
   331         conn.notices[:] = []
   377         conn.notices[:] = []
   332 
   378 
   333     def wait_for_notify(self, name='default', timeout=5):
   379     def wait_for_notify(self, name='default', timeout=5.0):
   334         '''Wait for asynchronous notifies, return the last one.
   380         '''Wait for asynchronous notifies, return the last one.
   335         
   381         
       
   382         name -- name of connection, must be created using create_conn_listen()
       
   383         timeout -- in seconds, floating point (None - wait forever)
       
   384         
   336         Returns None on timeout.
   385         Returns None on timeout.
   337         
   386         
   338         '''
   387         '''
   339         conn = self.get_conn(name)
   388         conn = self.get_conn(name)
   340         
   389         
   341         try:
   390         # return any notifies on stack
   342             # any residual notify?
   391         if conn.notifies:
   343             # then return it, that should not break anything
   392             return conn.notifies.pop()
       
   393 
       
   394         if select.select([conn], [], [], timeout) == ([], [], []):
       
   395             # timeout
       
   396             return None
       
   397         else:
       
   398             conn.poll()
       
   399 
       
   400             # return just the last notify (we do not care for older ones)
   344             if conn.notifies:
   401             if conn.notifies:
   345                 return conn.notifies.pop()
   402                 return conn.notifies.pop()
   346 
   403             return None
   347             if select.select([conn], [], [], timeout) == ([], [], []):
       
   348                 # timeout
       
   349                 return None
       
   350             else:
       
   351                 conn.poll()
       
   352 
       
   353                 # return just the last notify (we do not care for older ones)
       
   354                 if conn.notifies:
       
   355                     return conn.notifies.pop()
       
   356                 return None
       
   357         finally:
       
   358             # clean notifies
       
   359             while conn.notifies:
       
   360                 conn.notifies.pop()
       
   361             self.put_conn(conn, name)
       
   362 
   404 
   363     def _connect(self, ci):
   405     def _connect(self, ci):
   364         conn = psycopg2.connect(ci.dsn, connection_factory=Connection)
   406         conn = psycopg2.connect(ci.dsn, connection_factory=Connection)
       
   407         conn.name = ci.name
   365         if ci.keep_alive:
   408         if ci.keep_alive:
   366             conn.keep_alive()
   409             conn.keep_alive()
   367         if not ci.isolation_level is None:
   410         if not ci.isolation_level is None:
   368             conn.set_isolation_level(ci.isolation_level)
   411             conn.set_isolation_level(ci.isolation_level)
   369         if ci.init_statement:
   412         if ci.init_statement: