pgconsole/database.py
author Radek Brich <radek.brich@devl.cz>
Tue, 11 Dec 2012 11:25:06 +0100 (2012-12-11)
changeset 53 4a049a5af657
parent 10 f3a1b9792cc9
child 76 3a41b351b122
permissions -rw-r--r--
Update PgDiff: Support SQL patch for constraints. Fix changes of column default value.
import psycopg2
import psycopg2.extensions
import psycopg2.extras


class DatabaseError(Exception):
    def __init__(self, msg, query=None):
        self.query = query
        Exception.__init__(self, msg)


class BadConnectionError(Exception):
    pass


class Row(dict):
    def __getattr__(self, key):
        return self[key]


class Database:
    def __init__(self):
        # pool of database connections
        # indexed by conninfo, items are lists of connections
        self.pool = {}
        # number of unused connections per conninfo to keep open
        self.pool_keep_open = 1


    def __del__(self):
        for conninfo in self.pool.keys():
            for conn in self.pool[conninfo]:
                conn.close()


    def connect(self, conninfo):
        try:
            conn = psycopg2.connect(conninfo, async=1)
            psycopg2.extras.wait_select(conn)
        except psycopg2.DatabaseError, e:
            raise DatabaseError(str(e))
        return conn


    def get_conn(self, conninfo):
        if not conninfo in self.pool:
            self.pool[conninfo] = []
            return self.connect(conninfo)
        else:
            conn = None
            while len(self.pool[conninfo]) and conn is None:
                conn = self.pool[conninfo].pop()
                if conn.closed:
                    conn = None
            if conn is None:
                return self.connect(conninfo)
        return conn


    def put_conn(self, conninfo, conn):
        if len(self.pool[conninfo]) >= self.pool_keep_open:
            conn.close()
        else:
            self.pool[conninfo].append(conn)


    def execute(self, q, args=[]):
        conn = self.get_conn()
        try:
            curs = conn.cursor()
            curs.execute(q, args)
            psycopg2.extras.wait_select(curs.connection)
#            conn.commit()
        except psycopg2.OperationalError, e:
            # disconnected?
#            conn.rollback()
            conn.close()
            raise BadConnectionError(str(e))
        except psycopg2.DatabaseError, e:
#            conn.rollback()
            raise DatabaseError(str(e), curs.query)
        return curs


    def finish(self, curs):
        self.put_conn(curs.connection)


    def row(self, curs, row):
        return Row(zip([x[0] for x in curs.description], row))


    def fetchone(self, curs):
        return self.row(curs, curs.fetchone())


    def fetchall(self, curs):
        rows = curs.fetchall()
        return [self.row(curs, row) for row in rows]