PgManager: Update RowDict, add write support.
import psycopg2
import psycopg2.extensions
import psycopg2.extras
class DatabaseError(Exception):
def __init__(self, msg, query=None):
self.query = query
Exception.__init__(self, msg)
class BadConnectionError(Exception):
pass
class Row(dict):
def __getattr__(self, key):
return self[key]
class Database:
def __init__(self):
# pool of database connections
# indexed by conninfo, items are lists of connections
self.pool = {}
# number of unused connections per conninfo to keep open
self.pool_keep_open = 1
def __del__(self):
for conninfo in self.pool.keys():
for conn in self.pool[conninfo]:
conn.close()
def connect(self, conninfo):
try:
conn = psycopg2.connect(conninfo, async=1)
psycopg2.extras.wait_select(conn)
except psycopg2.DatabaseError as e:
raise DatabaseError(str(e))
return conn
def get_conn(self, conninfo):
if not conninfo in self.pool:
self.pool[conninfo] = []
return self.connect(conninfo)
else:
conn = None
while len(self.pool[conninfo]) and conn is None:
conn = self.pool[conninfo].pop()
if conn.closed:
conn = None
if conn is None:
return self.connect(conninfo)
return conn
def put_conn(self, conninfo, conn):
if len(self.pool[conninfo]) >= self.pool_keep_open:
conn.close()
else:
self.pool[conninfo].append(conn)
def execute(self, q, args=[]):
conn = self.get_conn()
try:
curs = conn.cursor()
curs.execute(q, args)
psycopg2.extras.wait_select(curs.connection)
# conn.commit()
except psycopg2.OperationalError as e:
# disconnected?
# conn.rollback()
conn.close()
raise BadConnectionError(str(e))
except psycopg2.DatabaseError as e:
# conn.rollback()
raise DatabaseError(str(e), curs.query)
return curs
def finish(self, curs):
self.put_conn(curs.connection)
def row(self, curs, row):
return Row(zip([x[0] for x in curs.description], row))
def fetchone(self, curs):
return self.row(curs, curs.fetchone())
def fetchall(self, curs):
rows = curs.fetchall()
return [self.row(curs, row) for row in rows]