# HG changeset patch # User Radek Brich # Date 1306426145 -7200 # Node ID eaae9539e910ac83474c3d80749defca7ff29729 Postgres tools. diff -r 000000000000 -r eaae9539e910 README --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/README Thu May 26 18:09:05 2011 +0200 @@ -0,0 +1,17 @@ +========= +pgtoolkit +========= + +General Python Modules +---------------------- + +pgmanager.py - Database connection manager (pooling etc.) +pgbrowser.py - Schema browser module +pgdiff.py - Schema diff tool + +Graphical and Command Line Tools +-------------------------------- + +pgbrowser-gtk.py - Database browser - graphical interface (GTK) +pgconsole-gtk.py - Query console - graphical interface (GTK) +pgdiff-cli.py - Command line diff tool diff -r 000000000000 -r eaae9539e910 common/__init__.py diff -r 000000000000 -r eaae9539e910 common/highlight.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/common/highlight.py Thu May 26 18:09:05 2011 +0200 @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- + +(BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, BOLD) = range(0,9) + +names = ['black','red','green','yellow','blue','magenta','cyan','white'] + +disabled = False # set True to disable all colors + +def highlight(enable, fg=None, bg=None): + ''' + + highlight(1) -- switch to bold + highlight(1,1) -- red foreground + highlight(1,3,4) -- red on blue + highlight(0) -- reset + + ''' + global disabled + if disabled: + return '' + + if enable: + code = '1' + if fg is not None: + code = '' + if fg >= 8: + fg -= 8 + code += '1;' + code += str(30 + fg) + if bg is not None: + code += ';' + if bg >= 8: + bg -= 8 + code += '1;' + code += str(40 + bg) + return "\033[" + code + "m" + else: + return "\033[0m" + +def sethighlight(enable, fg=None, bg=None): + print(highlight(enable, fg, bg), end='') + + +if __name__ == '__main__': + for c in range(0,8): + print( + highlight(1,c), names[c].ljust(20), + highlight(1,8+c), names[c].ljust(20), + highlight(0), sep='') + diff -r 000000000000 -r eaae9539e910 pgbrowser-gtk.py diff -r 000000000000 -r eaae9539e910 pgtools/__init__.py diff -r 000000000000 -r eaae9539e910 pgtools/pgbrowser.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pgtools/pgbrowser.py Thu May 26 18:09:05 2011 +0200 @@ -0,0 +1,258 @@ +# -*- coding: utf-8 -*- +# +# PgBrowser - browse database schema and metadata +# +# Some of the queries came from psql. +# +# Copyright (c) 2011 Radek Brich +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + + +from collections import OrderedDict + + +class Column: + def __init__(self, browser, table, + name, type, notnull, hasdefault, default, description): + self.browser = browser # Browser instance + self.table = table # Table instance + self.name = name + self.type = type + self.notnull = notnull + self.hasdefault = hasdefault + self.default = default + self.description = description + + +class Constraint: + def __init__(self, browser, table, name, type, definition): + self.browser = browser + self.table = table + self.name = name + self.type = type + self.definition = definition + + +class Index: + def __init__(self, browser, table, + name, primary, unique, clustered, valid, definition): + self.browser = browser + self.table = table + self.name = name + self.primary = primary + self.unique = unique + self.clustered = clustered + self.valid = valid + self.definition = definition + + +class Table: + def __init__(self, browser, schema, name, owner, size, description): + self._columns = None + self._constraints = None + self._indexes = None + self.browser = browser # Browser instance + self.schema = schema # Schema instance + self.name = name # table name, str + self.owner = owner + self.size = size + self.description = description + + def refresh(self): + self.refresh_columns() + self.refresh_constraints() + self.refresh_indexes() + + def refresh_columns(self): + rows = self.browser.list_columns(self.name, self.schema.name) + self._columns = OrderedDict([(x['name'], Column(self.browser, self, **x)) for x in rows]) + + def refresh_constraints(self): + rows = self.browser.list_constraints(self.name, self.schema.name) + self._constraints = OrderedDict([(x['name'], Constraint(self.browser, self, **x)) for x in rows]) + + def refresh_indexes(self): + rows = self.browser.list_indexes(self.name, self.schema.name) + self._indexes = OrderedDict([(x['name'], Index(self.browser, self, **x)) for x in rows]) + + def getcolumns(self): + if self._columns is None: + self.refresh_columns() + return self._columns + columns = property(getcolumns) + + def getconstraints(self): + if self._constraints is None: + self.refresh_constraints() + return self._constraints + constraints = property(getconstraints) + + def getindexes(self): + if self._indexes is None: + self.refresh_indexes() + return self._indexes + indexes = property(getindexes) + +class Schema: + def __init__(self, browser, name, owner, acl, description, system): + self._tables = None + self.browser = browser + self.name = name + self.owner = owner + self.acl = acl + self.description = description + self.system = system + + def refresh(self): + rows = self.browser.list_tables(self.name) + self._tables = OrderedDict([(x['name'], Table(self.browser, self, **x)) for x in rows]) + + def gettables(self): + if self._tables is None: + self.refresh() + return self._tables + tables = property(gettables) + + +class PgBrowser: + def __init__(self, conn=None): + self._schemas = None + self.conn = conn + + def setconn(self, conn=None): + self.conn = conn + + def refresh(self): + rows = self.list_schemas() + self._schemas = OrderedDict([(x['name'], Schema(self, **x)) for x in rows]) + + def getschemas(self): + if self._schemas is None: + self.refresh() + return self._schemas + schemas = property(getschemas) + + def _query(self, query, *args): + try: + curs = self.conn.cursor() + curs.execute(query, args) + curs.connection.commit() + rows = curs.fetchall() + return [dict(zip([desc[0] for desc in curs.description], row)) for row in rows] + finally: + curs.close() + + def list_databases(self): + return self._query(''' + SELECT + d.datname as "name", + pg_catalog.pg_get_userbyid(d.datdba) as "owner", + pg_catalog.pg_encoding_to_char(d.encoding) as "encoding", + d.datcollate as "collation", + d.datctype as "ctype", + d.datacl AS "acl", + CASE WHEN pg_catalog.has_database_privilege(d.datname, 'CONNECT') + THEN pg_catalog.pg_database_size(d.datname) + ELSE -1 -- No access + END as "size", + t.spcname as "tablespace", + pg_catalog.shobj_description(d.oid, 'pg_database') as "description" + FROM pg_catalog.pg_database d + JOIN pg_catalog.pg_tablespace t on d.dattablespace = t.oid + ORDER BY 1; + ''') + + def list_schemas(self): + return self._query(''' + SELECT + n.nspname AS "name", + pg_catalog.pg_get_userbyid(n.nspowner) AS "owner", + n.nspacl AS "acl", + pg_catalog.obj_description(n.oid, 'pg_namespace') AS "description", + CASE WHEN n.nspname IN ('information_schema', 'pg_catalog', 'pg_toast') + OR n.nspname ~ '^pg_temp_' OR n.nspname ~ '^pg_toast_temp_' + THEN TRUE + ELSE FALSE + END AS "system" + FROM pg_catalog.pg_namespace n + ORDER BY 1; + ''') + + def list_tables(self, schema='public'): + return self._query(''' + SELECT + c.relname as "name", + pg_catalog.pg_get_userbyid(c.relowner) as "owner", + pg_catalog.pg_relation_size(c.oid) as "size", + pg_catalog.obj_description(c.oid, 'pg_class') as "description" + FROM pg_catalog.pg_class c + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = %s AND c.relkind IN ('r','s','') + ORDER BY 1; + ''', schema) + + def list_columns(self, table, schema='public'): + return self._query(''' + SELECT + a.attname as "name", + format_type(a.atttypid, a.atttypmod) AS "type", + a.attnotnull as "notnull", + a.atthasdef as "hasdefault", + d.adsrc as "default", + pg_catalog.col_description(a.attrelid, a.attnum) AS "description" + FROM pg_catalog.pg_attribute a + LEFT JOIN pg_catalog.pg_class c ON a.attrelid = c.oid + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + LEFT JOIN pg_catalog.pg_attrdef d ON a.attrelid = d.adrelid AND a.attnum = d.adnum + WHERE n.nspname = %s AND c.relname = %s AND a.attnum > 0 AND NOT a.attisdropped + ORDER BY 1 + ''', schema, table) + + def list_constraints(self, table, schema='public'): + return self._query(''' + SELECT + conname as "name", + r.contype as "type", + pg_catalog.pg_get_constraintdef(r.oid, true) as "definition" + FROM pg_catalog.pg_constraint r + JOIN pg_catalog.pg_class c ON r.conrelid = c.oid + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = %s AND c.relname = %s + ORDER BY 1 + ''', schema, table) + + def list_indexes(self, table, schema='public'): + return self._query(''' + SELECT + c2.relname as "name", + i.indisprimary as "primary", + i.indisunique as "unique", + i.indisclustered as "clustered", + i.indisvalid as "valid", + pg_catalog.pg_get_indexdef(i.indexrelid, 0, true) as "definition" + --c2.reltablespace as "tablespace_oid" + FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_index i ON c.oid = i.indrelid + JOIN pg_catalog.pg_class c2 ON i.indexrelid = c2.oid + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = %s AND c.relname = %s + ORDER BY i.indisprimary DESC, i.indisunique DESC, c2.relname + ''', schema, table) + diff -r 000000000000 -r eaae9539e910 pgtools/pgdiff.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pgtools/pgdiff.py Thu May 26 18:09:05 2011 +0200 @@ -0,0 +1,311 @@ +# -*- coding: utf-8 -*- +# +# PgDiff - capture differences of database metadata +# +# Depends on PgBrowser +# +# Copyright (c) 2011 Radek Brich +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + + +from common.highlight import * + + +class DiffBase: + def __init__(self): + self.changes = None + + def format(self): + out = [' ' * self.level] + + if self.change == '+': + out.append(highlight(1, BOLD | GREEN)) + elif self.change == '-': + out.append(highlight(1, BOLD | RED)) + else: + out.append(highlight(1, BOLD | YELLOW)) + out.append(self.change) + + out += [' ', self.type, ' ', self.name, highlight(0)] + + if self.changes: + out += [highlight(1, WHITE), ' (', self.formatchanges(), ')', highlight(0)] + + return ''.join(out) + + def formatnotnull(self, notnull): + if notnull: + return 'NOT NULL' + else: + return None + + def formatchanges(self): + res = [] + for x in self.changes: + type, a, b = x + if type == 'notnull': + type = '' + a = self.formatnotnull(a) + b = self.formatnotnull(b) + + if a and b: + s = ''.join(['Changed ', type, ' from ', + highlight(1,15), a, highlight(0), ' to ', + highlight(1,15), b, highlight(0), '.']) + elif a and not b: + l = ['Removed '] + if type: + l += [type, ' '] + l += [highlight(1,15), a, highlight(0), '.'] + s = ''.join(l) + elif b and not a: + l = ['Added '] + if type: + l += [type, ' '] + l += [highlight(1,15), b, highlight(0), '.'] + s = ''.join(l) + res.append(s) + return ' '.join(res) + + +class DiffSchema(DiffBase): + def __init__(self, change, schema): + DiffBase.__init__(self) + self.level = 0 + self.type = 'schema' + self.change = change + self.schema = schema + self.name = schema + + +class DiffTable(DiffBase): + def __init__(self, change, schema, table): + DiffBase.__init__(self) + self.level = 1 + self.type = 'table' + self.change = change + self.schema = schema + self.table = table + self.name = table + + +class DiffColumn(DiffBase): + def __init__(self, change, schema, table, column, changes=None): + DiffBase.__init__(self) + self.level = 2 + self.type = 'column' + self.change = change + self.schema = schema + self.table = table + self.column = column + self.name = column + self.changes = changes + + +class DiffConstraint(DiffBase): + def __init__(self, change, schema, table, constraint, changes=None): + DiffBase.__init__(self) + self.level = 2 + self.type = 'constraint' + self.change = change + self.schema = schema + self.table = table + self.constraint = constraint + self.name = constraint + self.changes = changes + + +class PgDiff: + def __init__(self, srcbrowser=None, dstbrowser=None): + self.allowcolor = False + self.src = srcbrowser + self.dst = dstbrowser + self.include_schemas = set() # if not empty, consider only these schemas for diff + self.exclude_schemas = set() # exclude these schemas from diff + self.include_tables = set() + self.exclude_tables = set() + + def _test_filter(self, schema): + if self.include_schemas and schema not in self.include_schemas: + return False + if schema in self.exclude_schemas: + return False + return True + + def _test_table(self, schema, table): + name = schema + '.' + table + if self.include_tables and name not in self.include_tables: + return False + if name in self.exclude_tables: + return False + return True + + def _diffnames(self, src, dst): + for x in src: + if x in dst: + yield ('*', x) + else: + yield ('-', x) + for x in dst: + if x not in src: + yield ('+', x) + + def _compare_columns(self, a, b): + diff = [] + if a.type != b.type: + diff.append(('type', a.type, b.type)) + if a.notnull != b.notnull: + diff.append(('notnull', a.notnull, b.notnull)) + if a.default != b.default: + diff.append(('default', a.default, b.default)) + return diff + + def _compare_constraints(self, a, b): + diff = [] + if a.type != b.type: + diff.append(('type', a.type, b.type)) + if a.definition != b.definition: + diff.append(('definition', a.definition, b.definition)) + return diff + + def _diff_columns(self, schema, table, src_columns, dst_columns): + for nd in self._diffnames(src_columns, dst_columns): + cdo = DiffColumn(change=nd[0], schema=schema, table=table, column=nd[1]) + if nd[0] == '*': + a = src_columns[nd[1]] + b = dst_columns[nd[1]] + cdo.changes = self._compare_columns(a, b) + if cdo.changes: + yield cdo + else: + yield cdo + + def _diff_constraints(self, schema, table, src_constraints, dst_constraints): + for nd in self._diffnames(src_constraints, dst_constraints): + cdo = DiffConstraint(change=nd[0], schema=schema, table=table, constraint=nd[1]) + if nd[0] == '*': + a = src_constraints[nd[1]] + b = dst_constraints[nd[1]] + cdo.changes = self._compare_constraints(a, b) + if cdo.changes: + yield cdo + else: + yield cdo + + def _difftables(self, schema, src_tables, dst_tables): + for nd in self._diffnames(src_tables, dst_tables): + if not self._test_table(schema, nd[1]): + continue + tdo = DiffTable(change=nd[0], schema=schema, table=nd[1]) + if nd[0] == '*': + # columns + src_columns = src_tables[nd[1]].columns + dst_columns = dst_tables[nd[1]].columns + for cdo in self._diff_columns(schema, nd[1], src_columns, dst_columns): + if tdo: + yield tdo + tdo = None + yield cdo + # constraints + src_constraints = src_tables[nd[1]].constraints + dst_constraints = dst_tables[nd[1]].constraints + for cdo in self._diff_constraints(schema, nd[1], src_constraints, dst_constraints): + if tdo: + yield tdo + tdo = None + yield cdo + else: + yield tdo + + def iter_diff(self): + '''Return diff between src and dst database schema. + + Yields one line at the time. Each line is in form of object + iherited from DiffBase. This object contains all information + about changes. See format() method. + + ''' + src_schemas = self.src.schemas + dst_schemas = self.dst.schemas + src = [x.name for x in src_schemas.values() if not x.system and self._test_filter(x.name)] + dst = [x.name for x in dst_schemas.values() if not x.system and self._test_filter(x.name)] + for nd in self._diffnames(src, dst): + sdo = DiffSchema(change=nd[0], schema=nd[1]) + if nd[0] == '*': + src_tables = src_schemas[nd[1]].tables + dst_tables = dst_schemas[nd[1]].tables + for tdo in self._difftables(nd[1], src_tables, dst_tables): + if sdo: + yield sdo + sdo = None + yield tdo + else: + yield sdo + + def print_diff(self): + '''Print diff between src and dst database schema. + + The output is in human readable form. + + Set allowcolor=True of PgDiff instance to get colored output. + + ''' + for ln in self.iter_diff(): + print(ln.format()) + + def print_patch(self): + '''Print patch for updating from src schema to dst schema. + + Supports table drop, add, column drop, add and following + changes of columns: + - type + - set/remove not null + - default value + + This is experimental, not tested very much. + Do not use without checking the commands. + Even if it works as intended, it can cause table lock ups + and/or loss of data. You have been warned. + + ''' + for ln in self.iter_diff(): + print(ln.format_patch()) + + def filter_schemas(self, include=[], exclude=[]): + '''Modify list of schemas which are used for computing diff. + + include (list) -- if not empty, consider only these schemas for diff + exclude (list) -- exclude these schemas from diff + + Order: include, exclude + include=[] means include everything + ''' + self.include_schemas.clear() + self.include_schemas.update(include) + self.exclude_schemas.clear() + self.exclude_schemas.update(exclude) + + + def filter_tables(self, include=[], exclude=[]): + self.include_tables.clear() + self.include_tables.update(include) + self.exclude_tables.clear() + self.exclude_tables.update(exclude) + diff -r 000000000000 -r eaae9539e910 pgtools/pgmanager.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pgtools/pgmanager.py Thu May 26 18:09:05 2011 +0200 @@ -0,0 +1,323 @@ +# -*- coding: utf-8 -*- +# +# PgManager - manage database connections +# +# Requires: Python 2.6, psycopg2 +# +# Copyright (c) 2010, 2011 Radek Brich +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +"""Postgres database connection manager + +PgManager wraps psycopg2 connect function, adding following features: + + * Manage database connection parameters - link connection parameters + to an unique identifier, retrieve connection object by this identifier + + * Connection pooling - connections with same identifier are pooled and reused + + * Easy query using the with statement - retrieve cursor directly by connection + identifier, don't worry about connections + + * Dict rows - cursor has additional methods like fetchall_dict(), which + returns dict row instead of ordinary list-like row + +Example: + +import pgmanager + +pgm = pgmanager.get_instance() +pgm.create_conn(hostaddr='127.0.0.1', dbname='postgres') + +with pgm.cursor() as curs: + curs.execute('SELECT now() AS now') + row = curs.fetchone_dict() + print row.now + +First, we have obtained PgManager instance. This is like calling +PgManager(), although in our example the instance is global. That means +getting the instance in another module brings us all the defined connections +etc. + +On second line we created connection named 'default' (this name can be left out). +The with statement obtains connection (actually connects to database when needed), +then returns cursor for this connection. On exit, the connection is returned +to the pool or closed (depending on number of connections on pool and setting +of keep_open parameter). + +The row returned by fetchone_dict() is special dict object, which can be accessed +using item or attribute access, that is row['now'] or row.now. +""" + +from contextlib import contextmanager +import logging +import threading +import select + +import psycopg2 +import psycopg2.extensions + +from psycopg2 import DatabaseError, IntegrityError + + +class PgManagerError(Exception): + + pass + + +class ConnectionInfo: + + def __init__(self, dsn, isolation_level=None, init_statement=None, keep_open=1): + self.dsn = dsn + self.isolation_level = isolation_level + self.init_statement = init_statement + self.keep_open = keep_open + + +class RowDict(dict): + + def __getattr__(self, key): + return self[key] + + +class Cursor(psycopg2.extensions.cursor): + + def execute(self, query, args=None): + try: + return super(Cursor, self).execute(query, args) + finally: + log.debug(self.query) + + def callproc(self, procname, args=None): + try: + return super(Cursor, self).callproc(procname, args) + finally: + log.debug(self.query) + + def row_dict(self, row, lstrip=None): + adjustname = lambda a: a + if lstrip: + adjustname = lambda a: a.lstrip(lstrip) + return RowDict(zip([adjustname(desc[0]) for desc in self.description], row)) + + def fetchone_dict(self, lstrip=None): + row = super(Cursor, self).fetchone() + if not row: + return row + return self.row_dict(row, lstrip) + + def fetchall_dict(self, lstrip=None): + rows = super(Cursor, self).fetchall() + return [self.row_dict(row, lstrip) for row in rows] + + +class Connection(psycopg2.extensions.connection): + + def cursor(self, name=None): + if name is None: + return super(Connection, self).cursor(cursor_factory=Cursor) + else: + return super(Connection, self).cursor(name, cursor_factory=Cursor) + + +class PgManager: + + def __init__(self): + self.conn_known = {} # available connections + self.conn_pool = {} + self.lock = threading.Lock() + + def __del__(self): + for conn in tuple(self.conn_known.keys()): + self.destroy_conn(conn) + + def create_conn(self, name='default', isolation_level=None, dsn=None, **kw): + '''Create named connection.''' + if name in self.conn_known: + raise PgManagerError('Connection name "%s" already registered.', name) + + if dsn is None: + dsn = ' '.join([x[0]+'='+str(x[1]) for x in kw.items()]) + + isolation_level = self._normalize_isolation_level(isolation_level) + ci = ConnectionInfo(dsn, isolation_level) + + self.conn_known[name] = ci + self.conn_pool[name] = [] + + def close_conn(self, name='default'): + '''Close all connections of given name. + + Connection credentials are still saved. + + ''' + while len(self.conn_pool[name]): + conn = self.conn_pool[name].pop() + conn.close() + + def destroy_conn(self, name='default'): + '''Destroy connection. + + Counterpart of create_conn. + + ''' + if not name in self.conn_known: + raise PgManagerError('Connection name "%s" not registered.', name) + + self.close_conn(name) + + del self.conn_known[name] + del self.conn_pool[name] + + def get_conn(self, name='default'): + '''Get connection of name 'name' from pool.''' + self.lock.acquire() + try: + if not name in self.conn_known: + raise PgManagerError("Connection name '%s' not registered.", name) + + conn = None + while len(self.conn_pool[name]) and conn is None: + conn = self.conn_pool[name].pop() + if conn.closed: + conn = None + + if conn is None: + ci = self.conn_known[name] + conn = psycopg2.connect(ci.dsn, connection_factory=Connection) + if not ci.isolation_level is None: + conn.set_isolation_level(ci.isolation_level) + if ci.init_statement: + curs = conn.cursor() + curs.execute(ci.init_statement) + curs.close() + finally: + self.lock.release() + return conn + + def put_conn(self, conn, name='default'): + '''Put connection back to pool. + + Name must be same as used for get_conn, + otherwise things become broken. + + ''' + self.lock.acquire() + try: + if not name in self.conn_known: + raise PgManagerError("Connection name '%s' not registered.", name) + + if len(self.conn_pool[name]) >= self.conn_known[name].keep_open: + conn.close() + return + + if conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN: + conn.close() + return + + # connection returned to the pool must not be in transaction + if conn.get_transaction_status() != psycopg2.extensions.TRANSACTION_STATUS_IDLE: + conn.rollback() + + self.conn_pool[name].append(conn) + finally: + self.lock.release() + + @contextmanager + def cursor(self, name='default'): + '''Cursor context. + + Uses any connection of name 'name' from pool + and returns cursor for that connection. + + ''' + conn = self.get_conn(name) + + try: + curs = conn.cursor() + yield curs + finally: + curs.close() + self.put_conn(conn, name) + + def wait_for_notify(self, name='default', timeout=5): + '''Wait for asynchronous notifies, return the last one. + + Returns None on timeout. + + ''' + conn = self.get_conn(name) + + try: + # any residual notify? + # then return it, that should not break anything + if conn.notifies: + return conn.notifies.pop() + + if select.select([conn], [], [], timeout) == ([], [], []): + # timeout + return None + else: + conn.poll() + + # return just the last notify (we do not care for older ones) + if conn.notifies: + return conn.notifies.pop() + return None + finally: + # clean notifies + while conn.notifies: + conn.notifies.pop() + self.put_conn(conn, name) + + def _normalize_isolation_level(self, level): + if type(level) == str: + if level.lower() == 'autocommit': + return psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT + if level.lower() == 'read_committed': + return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED + if level.lower() == 'serializable': + return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE + raise PgManagerError('Unknown isolation level name: "%s"', level) + return level + + +try: + NullHandler = logging.NullHandler +except AttributeError: + class NullHandler(logging.Handler): + def emit(self, record): + pass + + +log = logging.getLogger("pgmanager") +log.addHandler(NullHandler()) + + +instance = None + + +def get_instance(): + global instance + if instance is None: + instance = PgManager() + return instance + +