pgconsole/database.py
author Radek Brich <brich.radek@ifortuna.cz>
Tue, 29 Apr 2014 17:50:15 +0200
changeset 98 024299702087
parent 76 3a41b351b122
permissions -rw-r--r--
Update batchcopy: When target record exists, allow to ignore / update the error (--dst-exists parameter).

import psycopg2
import psycopg2.extensions
import psycopg2.extras


class DatabaseError(Exception):
    def __init__(self, msg, query=None):
        self.query = query
        Exception.__init__(self, msg)


class BadConnectionError(Exception):
    pass


class Row(dict):
    def __getattr__(self, key):
        return self[key]


class Database:
    def __init__(self):
        # pool of database connections
        # indexed by conninfo, items are lists of connections
        self.pool = {}
        # number of unused connections per conninfo to keep open
        self.pool_keep_open = 1


    def __del__(self):
        for conninfo in self.pool.keys():
            for conn in self.pool[conninfo]:
                conn.close()


    def connect(self, conninfo):
        try:
            conn = psycopg2.connect(conninfo, async=1)
            psycopg2.extras.wait_select(conn)
        except psycopg2.DatabaseError as e:
            raise DatabaseError(str(e))
        return conn


    def get_conn(self, conninfo):
        if not conninfo in self.pool:
            self.pool[conninfo] = []
            return self.connect(conninfo)
        else:
            conn = None
            while len(self.pool[conninfo]) and conn is None:
                conn = self.pool[conninfo].pop()
                if conn.closed:
                    conn = None
            if conn is None:
                return self.connect(conninfo)
        return conn


    def put_conn(self, conninfo, conn):
        if len(self.pool[conninfo]) >= self.pool_keep_open:
            conn.close()
        else:
            self.pool[conninfo].append(conn)


    def execute(self, q, args=[]):
        conn = self.get_conn()
        try:
            curs = conn.cursor()
            curs.execute(q, args)
            psycopg2.extras.wait_select(curs.connection)
#            conn.commit()
        except psycopg2.OperationalError as e:
            # disconnected?
#            conn.rollback()
            conn.close()
            raise BadConnectionError(str(e))
        except psycopg2.DatabaseError as e:
#            conn.rollback()
            raise DatabaseError(str(e), curs.query)
        return curs


    def finish(self, curs):
        self.put_conn(curs.connection)


    def row(self, curs, row):
        return Row(zip([x[0] for x in curs.description], row))


    def fetchone(self, curs):
        return self.row(curs, curs.fetchone())


    def fetchall(self, curs):
        rows = curs.fetchall()
        return [self.row(curs, row) for row in rows]