pgconsole/database.py
changeset 10 f3a1b9792cc9
child 76 3a41b351b122
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pgconsole/database.py	Tue Aug 16 23:53:54 2011 +0200
@@ -0,0 +1,101 @@
+import psycopg2
+import psycopg2.extensions
+import psycopg2.extras
+
+
+class DatabaseError(Exception):
+    def __init__(self, msg, query=None):
+        self.query = query
+        Exception.__init__(self, msg)
+
+
+class BadConnectionError(Exception):
+    pass
+
+
+class Row(dict):
+    def __getattr__(self, key):
+        return self[key]
+
+
+class Database:
+    def __init__(self):
+        # pool of database connections
+        # indexed by conninfo, items are lists of connections
+        self.pool = {}
+        # number of unused connections per conninfo to keep open
+        self.pool_keep_open = 1
+
+
+    def __del__(self):
+        for conninfo in self.pool.keys():
+            for conn in self.pool[conninfo]:
+                conn.close()
+
+
+    def connect(self, conninfo):
+        try:
+            conn = psycopg2.connect(conninfo, async=1)
+            psycopg2.extras.wait_select(conn)
+        except psycopg2.DatabaseError, e:
+            raise DatabaseError(str(e))
+        return conn
+
+
+    def get_conn(self, conninfo):
+        if not conninfo in self.pool:
+            self.pool[conninfo] = []
+            return self.connect(conninfo)
+        else:
+            conn = None
+            while len(self.pool[conninfo]) and conn is None:
+                conn = self.pool[conninfo].pop()
+                if conn.closed:
+                    conn = None
+            if conn is None:
+                return self.connect(conninfo)
+        return conn
+
+
+    def put_conn(self, conninfo, conn):
+        if len(self.pool[conninfo]) >= self.pool_keep_open:
+            conn.close()
+        else:
+            self.pool[conninfo].append(conn)
+
+
+    def execute(self, q, args=[]):
+        conn = self.get_conn()
+        try:
+            curs = conn.cursor()
+            curs.execute(q, args)
+            psycopg2.extras.wait_select(curs.connection)
+#            conn.commit()
+        except psycopg2.OperationalError, e:
+            # disconnected?
+#            conn.rollback()
+            conn.close()
+            raise BadConnectionError(str(e))
+        except psycopg2.DatabaseError, e:
+#            conn.rollback()
+            raise DatabaseError(str(e), curs.query)
+        return curs
+
+
+    def finish(self, curs):
+        self.put_conn(curs.connection)
+
+
+    def row(self, curs, row):
+        return Row(zip([x[0] for x in curs.description], row))
+
+
+    def fetchone(self, curs):
+        return self.row(curs, curs.fetchone())
+
+
+    def fetchall(self, curs):
+        rows = curs.fetchall()
+        return [self.row(curs, row) for row in rows]
+
+