--- a/pgtoolkit/pgmanager.py Wed Sep 26 16:20:26 2012 +0200
+++ b/pgtoolkit/pgmanager.py Wed Sep 26 16:21:59 2012 +0200
@@ -105,13 +105,40 @@
class RowDict(OrderedDict):
+ """Special read-only dictionary used for rows returned from queries.
+
+ Initialization is same as for dict:
+ row = RowDict([('id', 123), ('name', 'hello')])
+
+ Allows key and attribute access to contained items:
+ row['id']
+ row.id
+
+ Items keep order in which columns where returned from database.
+
+ Tuple style access is also supported:
+ row[0]
+ id, name = row
+
+ """
+
+ def __init__(self, data):
+ self._dict = OrderedDict(data)
+
+ def __getitem__(self, key):
+ if isinstance(key, int):
+ return tuple(self._dict.values())[key]
+ return self._dict[key]
def __getattr__(self, key):
try:
- return self[key]
+ return self._dict[key]
except KeyError:
raise AttributeError(key)
+ def __contains__(self, key):
+ return key in self._dict
+
class Cursor(psycopg2.extensions.cursor):
@@ -160,10 +187,10 @@
def fetchone_adapted(self, lstrip=None):
'''Like fetchone_dict() but values are quoted for direct inclusion in SQL query.
-
+
This is useful when you need to generate SQL script from data returned
by the query. Use mogrify() for simple cases.
-
+
'''
row = super(Cursor, self).fetchone()
if row is None:
@@ -223,7 +250,7 @@
def create_conn(self, name='default', isolation_level=None, keep_alive=True, init_statement=None,
pool_size=1, dsn=None, **kwargs):
'''Create named connection.
-
+
name -- name for connection (default is "default")
pool_size -- how many connections will be kept open in pool
(more connections will still be created but they will be closed by put_conn)
@@ -231,9 +258,9 @@
isolation_level -- "autocommit", "read_committed", "serializable" or None for driver default
keep_alive -- set socket to keepalive mode
dsn -- connection string (parameters or data source name)
-
+
Other keyword args are used as connection parameters.
-
+
'''
if name in self.conn_known:
raise PgManagerError('Connection name "%s" already registered.' % name)
@@ -246,20 +273,20 @@
self.conn_known[name] = ci
self.conn_pool[name] = []
-
+
def create_conn_listen(self, name, channel, dsn=None, copy_dsn=None, **kwargs):
'''Create connection listening for notifies.
-
+
Disables pool. If you want to use pool, create other connection for that.
This connection can be used as usual: conn.cursor() etc.
- Don't use PgManager's cursor() and put_conn().
-
+ Don't use PgManager's cursor() and put_conn().
+
name -- name for connection
channel -- listen on this channel
copy_dsn -- specify name of other connection and its dsn will be used
-
+
Other parameters forwarded to create_conn().
-
+
'''
if dsn is None and copy_dsn:
try:
@@ -272,9 +299,9 @@
def close_conn(self, name='default'):
'''Close all connections of given name.
-
+
Connection credentials are still saved.
-
+
'''
while len(self.conn_pool[name]):
conn = self.conn_pool[name].pop()
@@ -282,9 +309,9 @@
def destroy_conn(self, name='default'):
'''Destroy connection.
-
+
Counterpart of create_conn.
-
+
'''
if not name in self.conn_known:
raise PgManagerError('Connection name "%s" not registered.' % name)
@@ -303,7 +330,7 @@
ci = self.conn_known[name]
except KeyError:
raise PgManagerError("Connection name '%s' not registered." % name)
-
+
# no pool, just one static connection
if ci.pool_size is None:
# check for existing connection
@@ -318,7 +345,7 @@
if conn is None:
conn = self._connect(ci)
self.conn_pool[name][0] = conn
-
+
# connection from pool
else:
conn = None
@@ -326,7 +353,7 @@
conn = self.conn_pool[name].pop()
if conn.closed:
conn = None
-
+
if conn is None:
conn = self._connect(ci)
finally:
@@ -335,10 +362,10 @@
def put_conn(self, conn, name='default'):
'''Put connection back to pool.
-
+
Name must be same as used for get_conn,
otherwise things become broken.
-
+
'''
self.lock.acquire()
try:
@@ -369,10 +396,10 @@
@contextmanager
def cursor(self, name='default'):
'''Cursor context.
-
+
Uses any connection of name 'name' from pool
and returns cursor for that connection.
-
+
'''
conn = self.get_conn(name)
@@ -391,15 +418,15 @@
def wait_for_notify(self, name='default', timeout=5.0):
'''Wait for asynchronous notifies, return the last one.
-
+
name -- name of connection, must be created using create_conn_listen()
timeout -- in seconds, floating point (None - wait forever)
-
+
Returns None on timeout.
-
+
'''
conn = self.get_conn(name)
-
+
# return any notifies on stack
if conn.notifies:
return conn.notifies.pop()
@@ -438,19 +465,19 @@
return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
raise PgManagerError('Unknown isolation level name: "%s"', level)
return level
-
+
def _check_fork(self):
'''Check if process was forked (PID has changed).
-
+
If it was, clean parent's connections.
New connections are created for children.
Known connection credentials are inherited, but not shared.
-
+
'''
if self.pid == multiprocessing.current_process().pid:
# PID has not changed
return
-
+
# update saved PID
self.pid = multiprocessing.current_process().pid
# reinitialize lock