Rename to pydbkit.
--- a/.hgignore Mon May 26 18:18:21 2014 +0200
+++ b/.hgignore Wed Jul 09 18:03:54 2014 +0200
@@ -3,7 +3,7 @@
^.pydevproject
^.settings/org.eclipse.core.resources.prefs
^.idea/
-^pgtoolkit\.conf$
+^pydbkit\.conf$
^doc/_build/
^build/
^dist/
--- a/README Mon May 26 18:18:21 2014 +0200
+++ b/README Wed Jul 09 18:03:54 2014 +0200
@@ -1,5 +1,5 @@
=========
-pgtoolkit
+ PyDbKit
=========
Requirements
--- a/TESTING Mon May 26 18:18:21 2014 +0200
+++ b/TESTING Wed Jul 09 18:03:54 2014 +0200
@@ -7,7 +7,7 @@
How to test
-----------
-1. copy pgtoolkit.conf.example to pgtoolkit.conf, modify it so it points to your test database
+1. copy pydbkit.conf.example to pydbkit.conf, modify it so it points to your test database
2. create test tables using sql/tests.sql
--- a/batchquery.py Mon May 26 18:18:21 2014 +0200
+++ b/batchquery.py Wed Jul 09 18:03:54 2014 +0200
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
-from pgtoolkit import toolbase
+from pydbkit import toolbase
class BatchQueryTool(toolbase.SimpleTool):
--- a/batchupdate.py Mon May 26 18:18:21 2014 +0200
+++ b/batchupdate.py Wed Jul 09 18:03:54 2014 +0200
@@ -2,8 +2,8 @@
import time
-from pgtoolkit import toolbase
-from pgtoolkit.pgmanager import OperationalError
+from pydbkit import toolbase
+from pydbkit.pgmanager import OperationalError
class BatchUpdateTool(toolbase.SimpleTool):
--- a/browser.py Mon May 26 18:18:21 2014 +0200
+++ b/browser.py Wed Jul 09 18:03:54 2014 +0200
@@ -5,8 +5,8 @@
locale.setlocale(locale.LC_ALL, '')
from tuikit import *
-from pgtoolkit import pgbrowser
-from pgtoolkit.toolbase import SimpleTool
+from pydbkit import pgbrowser
+from pydbkit.toolbase import SimpleTool
class MyApplication(Application, SimpleTool):
--- a/doc/Makefile Mon May 26 18:18:21 2014 +0200
+++ b/doc/Makefile Wed Jul 09 18:03:54 2014 +0200
@@ -77,17 +77,17 @@
@echo
@echo "Build finished; now you can run "qcollectiongenerator" with the" \
".qhcp project file in $(BUILDDIR)/qthelp, like this:"
- @echo "# qcollectiongenerator $(BUILDDIR)/qthelp/pgtoolkit.qhcp"
+ @echo "# qcollectiongenerator $(BUILDDIR)/qthelp/pydbkit.qhcp"
@echo "To view the help file:"
- @echo "# assistant -collectionFile $(BUILDDIR)/qthelp/pgtoolkit.qhc"
+ @echo "# assistant -collectionFile $(BUILDDIR)/qthelp/pydbkit.qhc"
devhelp:
$(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp
@echo
@echo "Build finished."
@echo "To view the help file:"
- @echo "# mkdir -p $$HOME/.local/share/devhelp/pgtoolkit"
- @echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/pgtoolkit"
+ @echo "# mkdir -p $$HOME/.local/share/devhelp/pydbkit"
+ @echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/pydbkit"
@echo "# devhelp"
epub:
--- a/doc/conf.py Mon May 26 18:18:21 2014 +0200
+++ b/doc/conf.py Wed Jul 09 18:03:54 2014 +0200
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
-# pgtoolkit documentation build configuration file, created by
+# pydbkit documentation build configuration file, created by
# sphinx-quickstart on Wed Feb 27 21:22:17 2013.
#
# This file is execfile()d with the current directory set to its containing dir.
@@ -41,7 +41,7 @@
master_doc = 'index'
# General information about the project.
-project = 'pgtoolkit'
+project = 'pydbkit'
copyright = '2013, Radek Brich'
# The version info for the project you're documenting, acts as replacement for
@@ -165,7 +165,7 @@
#html_file_suffix = None
# Output file base name for HTML help builder.
-htmlhelp_basename = 'pgtoolkitdoc'
+htmlhelp_basename = 'pydbkitdoc'
# -- Options for LaTeX output --------------------------------------------------
@@ -184,7 +184,7 @@
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author, documentclass [howto/manual]).
latex_documents = [
- ('index', 'pgtoolkit.tex', 'pgtoolkit Documentation',
+ ('index', 'pydbkit.tex', 'PyDbKit Documentation',
'Radek Brich', 'manual'),
]
@@ -214,7 +214,7 @@
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
- ('index', 'pgtoolkit', 'pgtoolkit Documentation',
+ ('index', 'pydbkit', 'PyDbKit Documentation',
['Radek Brich'], 1)
]
@@ -228,8 +228,8 @@
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
- ('index', 'pgtoolkit', 'pgtoolkit Documentation',
- 'Radek Brich', 'pgtoolkit', 'One line description of project.',
+ ('index', 'pydbkit', 'PyDbKit Documentation',
+ 'Radek Brich', 'pydbkit', 'One line description of project.',
'Miscellaneous'),
]
--- a/doc/index.rst Mon May 26 18:18:21 2014 +0200
+++ b/doc/index.rst Wed Jul 09 18:03:54 2014 +0200
@@ -1,10 +1,10 @@
-.. pgtoolkit documentation master file, created by
+.. pydbkit documentation master file, created by
sphinx-quickstart on Wed Feb 27 21:22:17 2013.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
-Welcome to pgtoolkit's documentation!
-=====================================
+Welcome to pydbkit's documentation!
+===================================
Contents:
--- a/doc/pgmanager.rst Mon May 26 18:18:21 2014 +0200
+++ b/doc/pgmanager.rst Wed Jul 09 18:03:54 2014 +0200
@@ -1,7 +1,7 @@
PgManager
=========
-PgManager is higher level database adapter for Python and Postgres. There is also MyManager for MySQL. Both are part of `pgtoolkit <http://hg.devl.cz/pgtoolkit>`_.
+PgManager is higher level database adapter for Python and Postgres. There is also MyManager for MySQL. Both are part of `pydbkit <http://hg.devl.cz/pydbkit>`_.
PgManager offers following convenience functionality over psycopg:
@@ -33,7 +33,7 @@
::
- from pgtoolkit import pgmanager
+ from pydbkit import pgmanager
pgm = pgmanager.get_instance()
pgm.create_conn(hostaddr='127.0.0.1', dbname='postgres', isolation_level='autocommit')
@@ -127,7 +127,7 @@
Methods
-------
-.. autoclass:: pgtoolkit.pgmanager.PgManager
+.. autoclass:: pydbkit.pgmanager.PgManager
:members:
:undoc-members:
--- a/ibrowser.py Mon May 26 18:18:21 2014 +0200
+++ b/ibrowser.py Wed Jul 09 18:03:54 2014 +0200
@@ -13,7 +13,7 @@
"""
-from pgtoolkit import pgbrowser, toolbase
+from pydbkit import pgbrowser, toolbase
import code
--- a/listtables.py Mon May 26 18:18:21 2014 +0200
+++ b/listtables.py Wed Jul 09 18:03:54 2014 +0200
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
-from pgtoolkit import pgbrowser, toolbase
+from pydbkit import pgbrowser, toolbase
class ListTablesTool(toolbase.SimpleTool):
--- a/mytoolkit/mymanager.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,330 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# MyManager - manage database connections (MySQL version)
-#
-# Requires: Python 2.6 / 2.7 / 3.2, MySQLdb
-#
-# Part of pgtoolkit
-# http://hg.devl.cz/pgtoolkit
-#
-# Copyright (c) 2011, 2013 Radek Brich <radek.brich@devl.cz>
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-"""MySQL database connection manager
-
-MyManager wraps MySQLdb in same manner as PgManager wraps psycopg2.
-It's fully compatible so it should work as drop-in replacement for PgManager.
-
-It adds following features over MySQLdb:
-
- * Save and reuse database connection parameters
-
- * Connection pooling
-
- * Easy query using the with statement
-
- * Dictionary rows
-
-Example:
-
- from mytoolkit import mymanager
-
- dbm = mymanager.get_instance()
- dbm.create_conn(host='127.0.0.1', dbname='default')
-
- with dbm.cursor() as curs:
- curs.execute('SELECT now() AS now')
- row = curs.fetchone_dict()
- print(row.now)
-
-See PgManager docs for more information.
-
-"""
-
-from contextlib import contextmanager
-from collections import OrderedDict
-import logging
-import threading
-import multiprocessing
-
-import MySQLdb
-import MySQLdb.cursors
-
-from MySQLdb import DatabaseError, IntegrityError, OperationalError
-
-from pgtoolkit.pgmanager import RowDict
-
-
-log_sql = logging.getLogger("mymanager_sql")
-log_sql.addHandler(logging.NullHandler())
-
-
-class MyManagerError(Exception):
-
- pass
-
-
-class ConnectionInfo:
-
- def __init__(self, name, isolation_level=None,
- init_statement=None, pool_size=1, **kw):
- self.name = name # connection name is logged with SQL queries
- self.isolation_level = isolation_level
- self.init_statement = init_statement
- self.pool_size = pool_size
- self.parameters = kw
- self.adjust_parameters()
-
- def adjust_parameters(self):
- '''Rename Postgres parameters to proper value for MySQL.'''
- m = {'dbname' : 'db', 'password' : 'passwd'}
- res = dict()
- for k, v in list(self.parameters.items()):
- if k in m:
- k = m[k]
- res[k] = v
- self.parameters = res
-
-
-class Cursor(MySQLdb.cursors.Cursor):
-
- def execute(self, query, args=None):
- try:
- return super(Cursor, self).execute(query, args)
- finally:
- self._log_query(query, args)
-
- def callproc(self, procname, args=None):
- try:
- return super(Cursor, self).callproc(procname, args)
- finally:
- self._log_query(query, args)
-
- def row_dict(self, row, lstrip=None):
- adjustname = lambda a: a
- if lstrip:
- adjustname = lambda a: a.lstrip(lstrip)
- return RowDict(zip([adjustname(desc[0]) for desc in self.description], row))
-
- def fetchone_dict(self, lstrip=None):
- row = super(Cursor, self).fetchone()
- if row is None:
- return None
- return self.row_dict(row, lstrip)
-
- def fetchall_dict(self, lstrip=None):
- rows = super(Cursor, self).fetchall()
- return [self.row_dict(row, lstrip) for row in rows]
-
- def mogrify(self, query, args):
- """Get query with substituted args as it will be send to server."""
- if isinstance(query, bytes):
- query = query.decode()
- if args is not None:
- db = self._get_db()
- query = query % db.literal(args)
- return query
-
- def _log_query(self, query, args):
- name = self.connection.name if hasattr(self.connection, 'name') else '-'
- query = self.mogrify(query, args)
- if isinstance(query, bytes):
- db = self._get_db()
- charset = db.character_set_name()
- query = query.decode(charset)
- log_sql.debug('[%s] %s' % (name, query))
-
-
-class MyManager:
-
- def __init__(self):
- self.conn_known = {} # available connections
- self.conn_pool = {}
- self.lock = threading.Lock()
- self.pid = multiprocessing.current_process().pid # forking check
-
- def __del__(self):
- for conn in tuple(self.conn_known.keys()):
- self.destroy_conn(conn)
-
- def create_conn(self, name='default', isolation_level=None, **kw):
- '''Create named connection.'''
- if name in self.conn_known:
- raise MyManagerError('Connection name "%s" already registered.' % name)
-
- isolation_level = self._normalize_isolation_level(isolation_level)
- ci = ConnectionInfo(name, isolation_level, **kw)
-
- self.conn_known[name] = ci
- self.conn_pool[name] = []
-
- def close_conn(self, name='default'):
- '''Close all connections of given name.
-
- Connection credentials are still saved.
-
- '''
- while len(self.conn_pool[name]):
- conn = self.conn_pool[name].pop()
- conn.close()
-
- def destroy_conn(self, name='default'):
- '''Destroy connection.
-
- Counterpart of create_conn.
-
- '''
- if not name in self.conn_known:
- raise MyManagerError('Connection name "%s" not registered.' % name)
-
- self.close_conn(name)
-
- del self.conn_known[name]
- del self.conn_pool[name]
-
- def get_conn(self, name='default'):
- '''Get connection of name 'name' from pool.'''
- self._check_fork()
- self.lock.acquire()
- try:
- if not name in self.conn_known:
- raise MyManagerError("Connection name '%s' not registered." % name)
-
- # connection from pool
- conn = None
- while len(self.conn_pool[name]) and conn is None:
- conn = self.conn_pool[name].pop()
- try:
- conn.ping()
- except MySQLdb.MySQLError:
- conn.close()
- conn = None
-
- if conn is None:
- ci = self.conn_known[name]
- conn = self._connect(ci)
- finally:
- self.lock.release()
- return conn
-
- def put_conn(self, conn, name='default'):
- '''Put connection back to pool.
-
- Name must be same as used for get_conn,
- otherwise things become broken.
-
- '''
- self.lock.acquire()
- try:
- if not name in self.conn_known:
- raise MyManagerError("Connection name '%s' not registered." % name)
-
- if len(self.conn_pool[name]) >= self.conn_known[name].pool_size:
- conn.close()
- return
-
- # connection returned to the pool must not be in transaction
- try:
- conn.rollback()
- except OperationalError:
- conn.close()
- return
-
- self.conn_pool[name].append(conn)
- finally:
- self.lock.release()
-
- @contextmanager
- def cursor(self, name='default'):
- '''Cursor context.
-
- Uses any connection of name 'name' from pool
- and returns cursor for that connection.
-
- '''
- conn = self.get_conn(name)
-
- try:
- curs = conn.cursor()
- yield curs
- finally:
- curs.close()
- self.put_conn(conn, name)
-
- def _connect(self, ci):
- conn = MySQLdb.connect(cursorclass=Cursor, **ci.parameters)
- if not ci.isolation_level is None:
- if ci.isolation_level == 'AUTOCOMMIT':
- conn.autocommit(True)
- else:
- curs = conn.cursor()
- curs.execute('SET SESSION TRANSACTION ISOLATION LEVEL ' + ci.isolation_level)
- curs.close()
- if ci.init_statement:
- curs = conn.cursor()
- curs.execute(ci.init_statement)
- curs.connection.commit()
- curs.close()
- return conn
-
- def _normalize_isolation_level(self, level):
- if level is None:
- return level
- if type(level) == str:
- level = level.upper().replace('_', ' ')
- if level in (
- 'AUTOCOMMIT',
- 'READ UNCOMMITTED',
- 'READ COMMITTED',
- 'REPEATABLE READ',
- 'SERIALIZABLE'):
- return level
- raise MyManagerError('Unknown isolation level name: "%s"', level)
-
- def _check_fork(self):
- '''Check if process was forked (PID has changed).
-
- If it was, clean parent's connections.
- New connections are created for children.
- Known connection credentials are inherited, but not shared.
-
- '''
- if self.pid == multiprocessing.current_process().pid:
- # PID has not changed
- return
-
- # update saved PID
- self.pid = multiprocessing.current_process().pid
- # reinitialize lock
- self.lock = threading.Lock()
- # clean parent's connections
- for name in self.conn_pool:
- self.conn_pool[name] = []
-
- @classmethod
- def get_instance(cls):
- if not hasattr(cls, '_instance'):
- cls._instance = cls()
- return cls._instance
-
-
-def get_instance():
- return MyManager.get_instance()
-
--- a/notifyexample.py Mon May 26 18:18:21 2014 +0200
+++ b/notifyexample.py Wed Jul 09 18:03:54 2014 +0200
@@ -3,7 +3,7 @@
# Call "NOTIFY notifyexample" on target DB to wake up this program.
#
-from pgtoolkit import toolbase, pgmanager
+from pydbkit import toolbase, pgmanager
class NotifyExample(toolbase.SimpleTool):
--- a/pgtool Mon May 26 18:18:21 2014 +0200
+++ b/pgtool Wed Jul 09 18:03:54 2014 +0200
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
"""
-Wrapper script for pgtoolkit tools.
+Wrapper script for pydbkit tools.
Usage
-----
@@ -21,11 +21,11 @@
Configuration
-------------
-Global pgtoolkit configuration:
- /etc/pgtoolkit.conf
+Global pydbkit configuration:
+ /etc/pydbkit.conf
Local configuration:
- ./pgtoolkit.conf
+ ./pydbkit.conf
Additional config file can be loaded using -c parameter (after tool name).
@@ -38,13 +38,13 @@
"""
-import pgtoolkit.tools
+import pydbkit.tools
import sys
from importlib import import_module
def print_tool_with_short_desc(name):
- module = import_module('pgtoolkit.tools.' + tool)
+ module = import_module('pydbkit.tools.' + tool)
short_desc = module.cls.__doc__.lstrip().splitlines()[0]
print(name.ljust(15), '-', short_desc)
@@ -56,7 +56,7 @@
if sys.argv[1].startswith('--'):
if sys.argv[1] == '--list':
- for tool in pgtoolkit.tools.__all__:
+ for tool in pydbkit.tools.__all__:
print_tool_with_short_desc(tool)
else:
print(__doc__, end='')
@@ -65,11 +65,11 @@
tool = sys.argv[1]
tool_args = sys.argv[2:]
- if tool not in pgtoolkit.tools.__all__:
+ if tool not in pydbkit.tools.__all__:
print('Unknown tool "%s".\n\nCall "pgtool --list" to get list of all available tools.' % tool)
sys.exit()
- module = import_module('pgtoolkit.tools.' + tool)
+ module = import_module('pydbkit.tools.' + tool)
tool = module.cls()
tool.setup(tool_args)
--- a/pgtoolkit.conf.example Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,11 +0,0 @@
-### named connections
-databases = {
- # database for tests.py (postgres, mysql - remove one of the lines to skip particular tests)
- 'test' : 'host=127.0.0.1 dbname=test user=test password=test',
- 'test_mysql' : 'host=127.0.0.1 db=test user=test password=test',
-}
-
-### meta database (contains connection parameters for other databases)
-meta_db = 'host=10.8.0.1 dbname=central'
-# query for connection parameters, input is database name (will be placed instead of %s)
-meta_query = '''SELECT host, port, dbname, user, password FROM config.databases WHERE name = %s LIMIT 1'''
--- a/pgtoolkit/configparser.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,45 +0,0 @@
-import logging
-
-
-class ConfigParser:
- def __init__(self):
- self.args = {} # config file arguments
- self.registered_args = {}
- self.log = logging.getLogger('config')
-
- def add_argument(self, name, type=str, default=None):
- self.registered_args[name] = {'type':type, 'default':default}
- self.args[name] = default
-
- def load(self, fname):
- # parse config file
- with open(fname) as f:
- exec(f.read(), self.args)
- # check contents
- return self.check()
-
- def check(self):
- ok = True
- for key in self.args.keys():
- if key == '__builtins__':
- continue
- if key in self.registered_args:
- # arg registered, check type
- type = self.registered_args[key]['type']
- if not isinstance(self.args[key], type) and not self.args[key] is None:
- ok = False
- self.log.error("Bad type of config parameter '%s': is %s but should be %s",
- key, type(self.args[key]), type)
- else:
- # arg not registered
- ok = False
- self.log.error("Unknown config parameter '%s'.", key)
- return ok
-
- def __getattr__(self, name):
- if name in self.args:
- return self.args[name]
-
- # raise error if not found
- raise AttributeError()
-
--- a/pgtoolkit/delayedquery.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,46 +0,0 @@
-import threading
-import time
-
-
-class DelayedQueryThread(threading.Thread):
- def __init__(self, targetdbm, targetname, delay, query, args):
- threading.Thread.__init__(self)
- self.targetdbm = targetdbm
- self.targetname = targetname
- self.delay = delay
- self.query = query
- self.args = args
-
- def run(self):
- time.sleep(self.delay)
- with self.targetdbm.cursor(self.targetname) as curs:
- curs.execute(self.query, self.args)
-
-
-class DelayedQuery:
- def __init__(self, targetdbm):
- '''Initialize DelayedQuery.
-
- targetdbm -- PgManager-like object
-
- '''
- self.targetdbm = targetdbm
- self.queryids = set()
-
- def add(self, delay, query, args, targetname='default', queryid=None):
- '''Add query to schedule.
-
- delay -- how long to wait, in seconds
- query, args -- query to be run after delay
- targetname -- name of connection in PgManager
- queryid -- discard if query with same id is already scheduled
-
- '''
- if queryid is not None:
- if queryid in self.queryids:
- return
- self.queryids.add(queryid)
-
- t = DelayedQueryThread(self.targetdbm, targetname, delay, query, args)
- t.start()
-
--- a/pgtoolkit/pgbrowser.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,496 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# PgBrowser - browse database schema and metadata
-#
-# Some of the queries came from psql.
-#
-# Copyright (c) 2011 Radek Brich <radek.brich@devl.cz>
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-
-from collections import OrderedDict
-
-
-class Column:
- def __init__(self, browser, table,
- name, type, notnull, hasdefault, default, description):
- self.browser = browser # Browser instance
- self.table = table # Table instance
- self.name = name
- self.type = type
- self.notnull = notnull
- self.hasdefault = hasdefault
- self.default = default
- self.description = description
-
-
-class Constraint:
- def __init__(self, browser, table, name, type, fname, fschema, definition):
- self.browser = browser
- self.table = table
- self.name = name
- self.type = type
- self.fname = fname # foreign table name
- self.fschema = fschema # foreign table schema
- self.definition = definition
-
-
-class Index:
- def __init__(self, browser, table,
- name, primary, unique, clustered, valid, definition,
- columns, size):
- self.browser = browser
- self.table = table
- self.name = name
- self.primary = primary
- self.unique = unique
- self.clustered = clustered
- self.valid = valid
- self.definition = definition
- self.columns = columns
- self.size = size
-
-
-class Table:
- def __init__(self, browser, schema, name, owner, size, description, options):
- self._columns = None
- self._constraints = None
- self._indexes = None
- self.browser = browser # Browser instance
- self.schema = schema # Schema instance
- self.name = name # table name, str
- self.owner = owner
- self.size = size
- self.description = description
- self.options = options or []
-
- def refresh(self):
- self.refresh_columns()
- self.refresh_constraints()
- self.refresh_indexes()
-
- def refresh_columns(self):
- rows = self.browser.list_columns(self.name, self.schema.name)
- self._columns = OrderedDict([(x['name'], Column(self.browser, self, **x)) for x in rows])
-
- def refresh_constraints(self):
- rows = self.browser.list_constraints(self.name, self.schema.name)
- self._constraints = OrderedDict([(x['name'], Constraint(self.browser, self, **x)) for x in rows])
-
- def refresh_indexes(self):
- rows = self.browser.list_indexes(self.name, self.schema.name)
- self._indexes = OrderedDict([(x['name'], Index(self.browser, self, **x)) for x in rows])
-
- def getcolumns(self):
- if self._columns is None:
- self.refresh_columns()
- return self._columns
- columns = property(getcolumns)
-
- def getconstraints(self):
- if self._constraints is None:
- self.refresh_constraints()
- return self._constraints
- constraints = property(getconstraints)
-
- def getindexes(self):
- if self._indexes is None:
- self.refresh_indexes()
- return self._indexes
- indexes = property(getindexes)
-
-
-class Argument:
- def __init__(self, browser, function, name, type, mode, default):
- # PgBrowser instance
- self.browser = browser
- # Function instance
- self.function = function
- self.name = name
- self.type = type
- self.mode = mode
- self.default = default
-
-
-class Function:
- def __init__(self, browser, schema, oid, name, function_name, type, result, source):
- self.browser = browser
- self.schema = schema
- self.oid = oid
- #: unique name - function name + arg types
- self.name = name
- #: pure function name without args
- self.function_name = function_name
- self.type = type
- self.result = result
- self.source = source
- self._arguments = None
- self._definition = None
-
- def refresh(self):
- self.refresh_args()
-
- def refresh_args(self):
- rows = self.browser.list_function_args(self.oid)
- self._arguments = OrderedDict([(x['name'], Argument(self.browser, self, **x)) for x in rows])
-
- @property
- def arguments(self):
- if self._arguments is None:
- self.refresh_args()
- return self._arguments
-
- @property
- def definition(self):
- """Get full function definition including CREATE command."""
- if not self._definition:
- self._definition = self.browser.get_function_definition(self.oid)
- return self._definition
-
-
-class Type:
- def __init__(self, browser, schema, name, type, elements, description):
- self.browser = browser
- self.schema = schema
- self.name = name
- self.type = type
- self.elements = elements
- self.description = description
-
-
-class Schema:
- def __init__(self, browser, name, owner, acl, description, system):
- self._tables = None
- self._functions = None
- self._types = None
- self.browser = browser
- self.name = name
- self.owner = owner
- self.acl = acl
- self.description = description
- self.system = system
-
- def refresh(self):
- self.refresh_tables()
- self.refresh_functions()
-
- def refresh_tables(self):
- rows = self.browser.list_tables(self.name)
- self._tables = OrderedDict([(x['name'], Table(self.browser, self, **x)) for x in rows])
-
- def refresh_functions(self):
- rows = self.browser.list_functions(self.name)
- self._functions = OrderedDict([(x['name'], Function(self.browser, self, **x)) for x in rows])
-
- def refresh_types(self):
- rows = self.browser.list_types(self.name)
- self._types = OrderedDict([(x['name'], Type(self.browser, self, **x)) for x in rows])
-
- @property
- def tables(self):
- if self._tables is None:
- self.refresh_tables()
- return self._tables
-
- @property
- def functions(self):
- if self._functions is None:
- self.refresh_functions()
- return self._functions
-
- @property
- def types(self):
- if self._types is None:
- self.refresh_types()
- return self._types
-
-
-class PgBrowser:
- def __init__(self, conn=None):
- self._schemas = None
- self.conn = conn
-
- def setconn(self, conn=None):
- self.conn = conn
-
- def refresh(self):
- self.refresh_schemas()
-
- def refresh_schemas(self):
- rows = self.list_schemas()
- self._schemas = OrderedDict([(x['name'], Schema(self, **x)) for x in rows])
-
- @property
- def schemas(self):
- if self._schemas is None:
- self.refresh_schemas()
- return self._schemas
-
- def _query(self, query, args):
- try:
- curs = self.conn.cursor()
- curs.execute(query, args)
- curs.connection.commit()
- rows = curs.fetchall()
- return [dict(zip([desc[0] for desc in curs.description], row)) for row in rows]
- finally:
- curs.close()
-
- def list_databases(self):
- return self._query('''
- SELECT
- d.datname as "name",
- pg_catalog.pg_get_userbyid(d.datdba) as "owner",
- pg_catalog.pg_encoding_to_char(d.encoding) as "encoding",
- d.datcollate as "collation",
- d.datctype as "ctype",
- d.datacl AS "acl",
- CASE WHEN pg_catalog.has_database_privilege(d.datname, 'CONNECT')
- THEN pg_catalog.pg_database_size(d.datname)
- ELSE -1 -- No access
- END as "size",
- t.spcname as "tablespace",
- pg_catalog.shobj_description(d.oid, 'pg_database') as "description"
- FROM pg_catalog.pg_database d
- JOIN pg_catalog.pg_tablespace t on d.dattablespace = t.oid
- ORDER BY 1;
- ''', [])
-
- def list_schemas(self):
- return self._query('''
- SELECT
- n.nspname AS "name",
- pg_catalog.pg_get_userbyid(n.nspowner) AS "owner",
- n.nspacl AS "acl",
- pg_catalog.obj_description(n.oid, 'pg_namespace') AS "description",
- CASE WHEN n.nspname IN ('information_schema', 'pg_catalog', 'pg_toast')
- OR n.nspname ~ '^pg_temp_' OR n.nspname ~ '^pg_toast_temp_'
- THEN TRUE
- ELSE FALSE
- END AS "system"
- FROM pg_catalog.pg_namespace n
- ORDER BY 1;
- ''', [])
-
- def list_tables(self, schema='public'):
- return self._query('''
- SELECT
- c.relname as "name",
- pg_catalog.pg_get_userbyid(c.relowner) as "owner",
- pg_catalog.pg_relation_size(c.oid) as "size",
- pg_catalog.obj_description(c.oid, 'pg_class') as "description",
- c.reloptions as "options"
- FROM pg_catalog.pg_class c
- LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
- WHERE n.nspname = %s AND c.relkind IN ('r','s','')
- ORDER BY 1;
- ''', [schema])
-
- def list_columns(self, table, schema='public', order=2):
- return self._query('''
- SELECT
- --a.attrelid,
- a.attname as "name",
- format_type(a.atttypid, a.atttypmod) AS "type",
- a.attnotnull as "notnull",
- a.atthasdef as "hasdefault",
- pg_catalog.pg_get_expr(d.adbin, d.adrelid) as "default",
- pg_catalog.col_description(a.attrelid, a.attnum) AS "description"
- FROM pg_catalog.pg_attribute a
- LEFT JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
- LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
- LEFT JOIN pg_catalog.pg_attrdef d ON a.attrelid = d.adrelid AND a.attnum = d.adnum
- WHERE n.nspname = %s AND c.relname = %s AND a.attnum > 0 AND NOT a.attisdropped
- ORDER BY ''' + str(order), [schema, table])
-
- def list_constraints(self, table, schema='public'):
- return self._query('''
- SELECT
- r.conname AS "name",
- r.contype AS "type",
- cf.relname AS "fname",
- nf.nspname AS "fschema",
- pg_catalog.pg_get_constraintdef(r.oid, true) as "definition"
- FROM pg_catalog.pg_constraint r
- JOIN pg_catalog.pg_class c ON r.conrelid = c.oid
- JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
- LEFT JOIN pg_catalog.pg_class cf ON r.confrelid = cf.oid
- LEFT JOIN pg_catalog.pg_namespace nf ON nf.oid = cf.relnamespace
- WHERE n.nspname = %s AND c.relname = %s
- ORDER BY 1
- ''', [schema, table])
-
- def list_indexes(self, table, schema='public'):
- return self._query('''
- SELECT
- c2.relname as "name",
- i.indisprimary as "primary",
- i.indisunique as "unique",
- i.indisclustered as "clustered",
- i.indisvalid as "valid",
- pg_catalog.pg_get_indexdef(i.indexrelid, 0, true) as "definition",
- ARRAY(SELECT a.attname FROM pg_catalog.pg_attribute a WHERE a.attrelid = c2.oid ORDER BY attnum) AS "columns",
- pg_catalog.pg_relation_size(c2.oid) as "size"
- --c2.reltablespace as "tablespace_oid"
- FROM pg_catalog.pg_class c
- JOIN pg_catalog.pg_index i ON c.oid = i.indrelid
- JOIN pg_catalog.pg_class c2 ON i.indexrelid = c2.oid
- JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
- WHERE n.nspname = %(schema)s AND c.relname = %(table)s
- ORDER BY i.indisprimary DESC, i.indisunique DESC, c2.relname
- ''', {'schema': schema, 'table': table})
-
- def list_functions(self, schema='public'):
- '''List functions in schema.'''
- return self._query('''
- SELECT
- p.oid as "oid",
- p.proname || '(' || array_to_string(
- array(SELECT pg_catalog.format_type(unnest(p.proargtypes), NULL)),
- ', '
- ) || ')' as "name",
- p.proname as "function_name",
- pg_catalog.pg_get_function_result(p.oid) as "result",
- p.prosrc as "source",
- CASE
- WHEN p.proisagg THEN 'agg'
- WHEN p.proiswindow THEN 'window'
- WHEN p.prorettype = 'pg_catalog.trigger'::pg_catalog.regtype THEN 'trigger'
- ELSE 'normal'
- END as "type"
- FROM pg_catalog.pg_proc p
- LEFT JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace
- WHERE n.nspname = %s
- ORDER BY 1, 2, 4;
- ''', [schema])
-
- def list_function_args(self, oid):
- """List function arguments.
-
- Notes about query:
- type: Use allargtypes if present, argtypes otherwise.
- The trick with [0:999] moves lower bound from 0 to default 1
- by slicing all elements (slices has always lower bound 1).
- mode: This trick makes array of NULLs of same length as argnames,
- in case argmodes is NULL.
- default: Use pg_get_expr, split output by ', '
- FIXME: will fail if ', ' is present in default value string.
- """
- return self._query('''
- SELECT
- unnest(p.proargnames) AS "name",
- pg_catalog.format_type(unnest(
- COALESCE(p.proallargtypes, (p.proargtypes::oid[])[0:999])
- ), NULL) AS "type",
- unnest(
- COALESCE(
- p.proargmodes::text[],
- array(SELECT NULL::text FROM generate_series(1, array_upper(p.proargnames, 1)))
- )
- ) AS "mode",
- unnest(array_cat(
- array_fill(NULL::text, array[COALESCE(array_upper(p.proargnames,1),0) - p.pronargdefaults]),
- string_to_array(pg_get_expr(p.proargdefaults, 'pg_proc'::regclass, true), ', ')
- )) AS "default"
- FROM pg_proc p
- WHERE p.oid = %s''', [oid])
-
- def get_function_definition(self, oid):
- """Get full function definition, including CREATE command etc.
-
- Args:
- oid: function oid from pg_catalog.pg_proc (returned by list_functions)
-
- """
- return self._query('''SELECT pg_get_functiondef(%s) AS definition;''', [oid])[0]['definition']
-
- def list_types(self, schema='public'):
- """List types in schema.
-
- http://www.postgresql.org/docs/8.4/static/catalog-pg-type.html
-
- """
- return self._query('''
- SELECT
- t.typname AS "name",
- CASE
- WHEN t.typtype = 'b' THEN 'base'::text
- WHEN t.typtype = 'c' THEN 'composite'::text
- WHEN t.typtype = 'd' THEN 'domain'::text
- WHEN t.typtype = 'e' THEN 'enum'::text
- WHEN t.typtype = 'p' THEN 'pseudo'::text
- END AS "type",
- ARRAY(
- SELECT e.enumlabel
- FROM pg_catalog.pg_enum e
- WHERE e.enumtypid = t.oid
- ORDER BY e.oid
- ) AS "elements",
- pg_catalog.obj_description(t.oid, 'pg_type') AS "description"
- FROM pg_catalog.pg_type t
- LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
- WHERE (t.typrelid = 0 OR (SELECT c.relkind = 'c' FROM pg_catalog.pg_class c WHERE c.oid = t.typrelid))
- AND NOT EXISTS(SELECT 1 FROM pg_catalog.pg_type el WHERE el.oid = t.typelem AND el.typarray = t.oid)
- AND n.nspname <> 'pg_catalog'
- AND n.nspname <> 'information_schema'
- AND n.nspname = %(schema)s
- ORDER BY 1, 2;
- ''', {'schema': schema})
-
- def list_sequences(self, schema=None):
- '''List sequences in schema.'''
- return self._query('''
- SELECT
- nc.nspname AS "sequence_schema",
- c.relname AS "sequence_name",
- t.relname AS "related_table",
- a.attname AS "related_column",
- format_type(a.atttypid, a.atttypmod) AS "related_column_type"
- FROM pg_class c
- JOIN pg_namespace nc ON nc.oid = c.relnamespace
- JOIN pg_depend d ON d.objid = c.oid
- JOIN pg_class t ON d.refobjid = t.oid
- JOIN pg_attribute a ON (d.refobjid, d.refobjsubid) = (a.attrelid, a.attnum)
- WHERE c.relkind = 'S' AND NOT pg_is_other_temp_schema(nc.oid)
- ''' + (schema and ' AND nc.nspname = %(schema)s' or '') + '''
- ''', {'schema': schema})
-
- def list_column_usage(self, table, column, schema='public'):
- '''List objects using the column.
-
- Currently shows views and constraints which use the column.
-
- This is useful to find which views block alteration of column type etc.
-
- '''
- return self._query('''
- SELECT
- 'view' AS type, view_schema AS schema, view_name AS name
- FROM information_schema.view_column_usage
- WHERE table_schema=%(schema)s AND table_name=%(table)s AND column_name=%(column)s
-
- UNION
-
- SELECT
- 'constraint' AS type, constraint_schema AS schema, constraint_name AS name
- FROM information_schema.constraint_column_usage
- WHERE table_schema=%(schema)s AND table_name=%(table)s AND column_name=%(column)s
- ''', {'schema':schema, 'table':table, 'column':column})
-
--- a/pgtoolkit/pgdatacopy.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,116 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# PgDataCopy - copy data between tables
-#
-# Copyright (c) 2012 Radek Brich <radek.brich@devl.cz>
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-
-import io
-
-
-class TargetNotEmptyError(Exception):
-
- def __init__(self, msg, table):
- Exception.__init__(self, msg)
- self.table = table
-
-
-class PgDataCopy:
-
- def __init__(self, conn1, conn2):
- self.conn1 = conn1
- self.conn2 = conn2
- self.fulltable1 = None
- self.fulltable2 = None
-
- def set_source(self, table, schema='public'):
- self.schema1 = schema
- self.table1 = table
- self.fulltable1 = '"' + schema + '"."'+ table + '"'
-
- def set_destination(self, table, schema='public'):
- self.schema2 = schema
- self.table2 = table
- self.fulltable2 = '"' + schema + '"."'+ table + '"'
-
- def copy(self):
- self.check()
-
- buf = io.StringIO()
- try:
- self.read(buf)
- data = buf.getvalue()
- finally:
- buf.close()
-
- buf = io.StringIO(data)
- try:
- self.write(buf)
- finally:
- buf.close()
-
- self.analyze()
-
- def check(self):
- '''Check that target table does not contain any data (otherwise cannot copy).'''
- q = self._compose_check(self.fulltable2)
- curs = self.conn2.cursor()
- curs.execute(q)
- curs.connection.commit()
- if curs.rowcount > 0:
- raise TargetNotEmptyError('Target table contains data.', self.fulltable2)
- self.cols = [desc[0] for desc in curs.description]
-
- def read(self, tmpfile):
- '''Read contents from source table.'''
- q = self._compose_read(self.fulltable1, self.cols)
- curs = self.conn1.cursor()
- curs.copy_expert(q, tmpfile)
- curs.connection.commit()
-
- def write(self, tmpfile):
- '''Write source table contents to target table.'''
- q = self._compose_write(self.fulltable2, self.cols)
- curs = self.conn2.cursor()
- curs.copy_expert(q, tmpfile)
- curs.connection.commit()
-
- def analyze(self):
- '''Analyze target table.'''
- q = self._compose_analyze(self.fulltable2)
- curs = self.conn2.cursor()
- curs.execute(q)
- curs.connection.commit()
-
- def _compose_check(self, table):
- return 'SELECT * FROM %s LIMIT 1' % table
-
- def _compose_read(self, table, cols):
- collist = ', '.join(['"%s"' % col for col in cols])
- return 'COPY %s (%s) TO STDOUT' % (table, collist)
-
- def _compose_write(self, table, cols):
- collist = ', '.join(['"%s"' % col for col in cols])
- return 'COPY %s (%s) FROM STDIN' % (table, collist)
-
- def _compose_analyze(self, table):
- return 'ANALYZE %s' % table
-
--- a/pgtoolkit/pgdatadiff.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,273 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# PgDataDiff - compare tables, print data differencies
-#
-# Copyright (c) 2011 Radek Brich <radek.brich@devl.cz>
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-
-from collections import OrderedDict
-
-from pgtoolkit import pgbrowser
-from pycolib.ansicolor import *
-
-import sys
-
-
-class DiffData:
- COLORS = {
- '+' : BOLD | GREEN,
- '-' : BOLD | RED,
- '*' : BOLD | YELLOW,
- 'V' : BOLD | WHITE,
- 'K' : BOLD | BLUE}
-
- def __init__(self, change, cols1, cols2, key=None):
- """
-
- change - one of '+', '-', '*' (add, remove, update)
- cols1 - original column values (OrderedDict)
- cols2 - new column values (OrderedDict)
- key - primary key columns (OrderedDict)
-
- """
- self.change = change
- self.cols1 = cols1
- self.cols2 = cols2
- self.key = key
-
- def format(self):
- out = []
-
- out.append(highlight(1, self.COLORS[self.change]))
- out.extend([self.change, ' '])
-
- out.extend(self._format_changes())
-
- out.append(highlight(0))
-
- return ''.join(out)
-
- def format_patch(self, table):
- method = {
- '+' : self._format_insert,
- '-' : self._format_delete,
- '*' : self._format_update}
-
- return method[self.change](table)
-
- def _format_changes(self):
- if self.cols1 and not self.cols2:
- return [', '.join([self._format_value_del(*x) for x in self.cols1.items()])]
- if not self.cols1 and self.cols2:
- return [', '.join([self._format_value_add(*x) for x in self.cols2.items()])]
-
- out = []
- if self.key:
- for colname in self.key:
- out.extend([highlight(1, self.COLORS['*']), colname, ': ', highlight(0), self.key[colname], ', '])
-
- items = []
- for i in range(len(self.cols1)):
- items.append((
- list(self.cols1.keys())[i],
- list(self.cols1.values())[i],
- list(self.cols2.values())[i]))
- out.extend([', '.join([self._format_value_change(*x) for x in items])])
-
- return out
-
- def _format_value_del(self, k, v):
- fs = (highlight(1, self.COLORS['-']) + '{}: ' + highlight(0) + '{}')
- return fs.format(k, v)
-
- def _format_value_add(self, k, v):
- fs = (highlight(1, self.COLORS['+']) + '{}: ' + highlight(0) +
- highlight(1, self.COLORS['V']) + '{}' + highlight(0))
- return fs.format(k, v)
-
- def _format_value_change(self, k, v1, v2):
- fs = (highlight(1, self.COLORS['*']) + '{}: ' + highlight(0) +
- '{} â–¶ ' +
- highlight(1, self.COLORS['V']) + '{}' + highlight(0))
- return fs.format(k, v1, v2)
-
- def _format_insert(self, table):
- out = ['INSERT INTO ', table, ' (']
- out.append(', '.join(self.cols2.keys()))
- out.append(') VALUES (')
- out.append(', '.join(self.cols2.values()))
- out.append(');')
- return ''.join(out)
-
- def _format_delete(self, table):
- out = ['DELETE FROM ', table]
- out.extend(self._format_where())
- return ''.join(out)
-
- def _format_update(self, table):
- out = ['UPDATE ', table, ' SET ']
- out.append(', '.join([self._format_set(*x) for x in self.cols2.items()]))
- out.extend(self._format_where())
- return ''.join(out)
-
- def _format_set(self, k, v):
- return '{} = {}'.format(k, v)
-
- def _format_where(self):
- out = [' WHERE ']
- for colname in self.key:
- out.extend([colname, ' = ', self.key[colname], ' AND '])
- out[-1] = ';'
- return out
-
-class PgDataDiff:
- def __init__(self, conn1, conn2):
- self.allowcolor = False
- self.conn1 = conn1
- self.conn2 = conn2
- self.fulltable1 = None
- self.fulltable2 = None
-
- def settable1(self, table, schema='public'):
- self.schema1 = schema
- self.table1 = table
- self.fulltable1 = '"' + schema + '"."'+ table + '"'
-
- def settable2(self, table, schema='public'):
- self.schema2 = schema
- self.table2 = table
- self.fulltable2 = '"' + schema + '"."'+ table + '"'
-
- def iter_diff(self):
- """Return differencies between data of two tables.
-
- Yields one line at the time.
-
- """
- curs1, curs2 = self._select()
-
- row1 = curs1.fetchone_dict()
- row2 = curs2.fetchone_dict()
-
- while True:
- if row1 is None and row2 is None:
- break
- diff = self._compare_row(row1, row2, curs1.adapt, curs2.adapt)
-
- if diff:
- yield diff
-
- if diff.change == '-':
- row1 = curs1.fetchone_dict()
- continue
- if diff.change == '+':
- row2 = curs2.fetchone_dict()
- continue
- # change == '*' or not diff
- row1 = curs1.fetchone_dict()
- row2 = curs2.fetchone_dict()
-
- curs1.close()
- curs2.close()
-
- def print_diff(self, file=sys.stdout):
- """Print differencies between data of two tables.
-
- The output is in human readable form.
-
- Set allowcolor=True of PgDataDiff instance to get colored output.
-
- """
- for ln in self.iter_diff():
- print(ln.format(), file=file)
-
- def print_patch(self, file=sys.stdout):
- """Print SQL script usable as patch for destination table.
-
- Supports INSERT, DELETE and UPDATE operations.
-
- """
- for ln in self.iter_diff():
- print(ln.format_patch(self.fulltable1), file=file)
-
- def _select(self):
- browser = pgbrowser.PgBrowser(self.conn1)
-
- columns = browser.list_columns(schema=self.schema1, table=self.table1, order=1)
- if not columns:
- raise Exception('Table %s.%s not found.' % (self.schema1, self.table1))
- columns_sel = ', '.join(['"' + x['name'] + '"' for x in columns])
- self.colnames = [x['name'] for x in columns]
-
- pkey = [ind for ind in browser.list_indexes(schema=self.schema1, table=self.table1) if ind['primary']]
- if not pkey:
- raise Exception('Table %s.%s has no primary key.' % (self.schema1, self.table1))
- pkey = pkey[0]
- pkey_sel = ', '.join(['"' + x + '"' for x in pkey['columns']])
- self.pkeycolnames = pkey['columns']
-
- query1 = 'SELECT ' + columns_sel + ' FROM ' + self.fulltable1 + ' ORDER BY ' + pkey_sel
- query2 = 'SELECT ' + columns_sel + ' FROM ' + self.fulltable2 + ' ORDER BY ' + pkey_sel
-
- curs1 = self.conn1.cursor('curs1')
- curs2 = self.conn2.cursor('curs2')
-
- curs1.execute(query1)
- curs2.execute(query2)
-
- return curs1, curs2
-
- def _compare_data(self, row1, row2):
- cols1 = OrderedDict()
- cols2 = OrderedDict()
- for name in row1.keys():
- if row1[name] != row2[name]:
- cols1[name] = row1[name]
- cols2[name] = row2[name]
- if cols1:
- key = OrderedDict(zip(self.pkeycolnames, [row1[colname] for colname in self.pkeycolnames]))
- return DiffData('*', cols1, cols2, key=key)
-
- return None
-
- def _compare_row(self, row1, row2, adapt1, adapt2):
- if row2 is None:
- row1 = adapt1(row1)
- key = OrderedDict(zip(self.pkeycolnames, [row1[colname] for colname in self.pkeycolnames]))
- return DiffData('-', row1, None, key=key)
- if row1 is None:
- row2 = adapt2(row2)
- return DiffData('+', None, row2)
-
- for keyname in self.pkeycolnames:
- if row1[keyname] < row2[keyname]:
- row1 = adapt1(row1)
- key = OrderedDict(zip(self.pkeycolnames, [row1[colname] for colname in self.pkeycolnames]))
- return DiffData('-', row1, None, key=key)
- for keyname in self.pkeycolnames:
- if row1[keyname] > row2[keyname]:
- row2 = adapt2(row2)
- return DiffData('+', None, row2)
-
- row1 = adapt1(row1)
- row2 = adapt2(row2)
- return self._compare_data(row1, row2)
-
--- a/pgtoolkit/pgdiff.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,635 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# PgDiff - capture differences of database metadata
-#
-# Depends on PgBrowser
-#
-# Copyright (c) 2011 Radek Brich <radek.brich@devl.cz>
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-
-from pycolib.ansicolor import *
-
-import re
-import difflib
-
-
-class PgDiffError(Exception):
- pass
-
-
-class DiffBase:
- COLORS = {
- '+' : BOLD | GREEN,
- '-' : BOLD | RED,
- '*' : BOLD | YELLOW,
- }
-
- COMMANDS = {
- '+' : 'CREATE',
- '-' : 'DROP',
- '*' : 'ALTER',
- }
-
- def __init__(self):
- self.changes = None
-
- def format(self):
- out = [' ' * self.level]
-
- out.append(highlight(1, self.COLORS[self.change]))
- out.append(self.change)
-
- out += [' ', self.type, ' ', self.name, highlight(0)]
-
- if self.changes:
- out += [highlight(1, WHITE), ' (', self._formatchanges(), ')', highlight(0)]
-
- return ''.join(out)
-
- def _formatnotnull(self, notnull):
- if notnull:
- return 'NOT NULL'
- else:
- return None
-
- def _formatchanges(self):
- res = []
- for type, a, b in self.changes:
- if type == 'notnull':
- type = ''
- a = self._formatnotnull(a)
- b = self._formatnotnull(b)
-
- if a and b:
- s = ''.join(['Changed ', type, ' from ',
- highlight(1,15), a, highlight(0), ' to ',
- highlight(1,15), b, highlight(0), '.'])
- elif a and not b:
- l = ['Removed ']
- if type:
- l += [type, ' ']
- l += [highlight(1,15), a, highlight(0), '.']
- s = ''.join(l)
- elif b and not a:
- l = ['Added ']
- if type:
- l += [type, ' ']
- l += [highlight(1,15), b, highlight(0), '.']
- s = ''.join(l)
- res.append(s)
- return ' '.join(res)
-
- def format_patch(self):
- if self.change == '*' and self.type in ('schema', 'table'):
- return None
- return ['%s %s %s;' % (self.COMMANDS[self.change], self.type.upper(), self.name)]
-
-
-class DiffSchema(DiffBase):
- def __init__(self, change, schema):
- DiffBase.__init__(self)
- self.level = 0
- self.type = 'schema'
- self.change = change
- self.schema = schema
- self.name = schema
-
-
-class DiffTable(DiffBase):
- def __init__(self, change, schema, table):
- DiffBase.__init__(self)
- self.level = 1
- self.type = 'table'
- self.change = change
- self.schema = schema
- self.table = table
- self.name = table
-
-
-class DiffArgument(DiffBase):
- def __init__(self, change, schema, function, argument):
- DiffBase.__init__(self)
- self.level = 2
- self.type = 'argument'
- self.change = change
- self.schema = schema
- self.function = function
- self.argument = argument
- self.name = argument
-
-
-class DiffFunction(DiffBase):
- def __init__(self, change, schema, function, definition, show_body_diff=False):
- DiffBase.__init__(self)
- self.level = 1
- self.type = 'function'
- self.change = change
- self.schema = schema
- self.function = function
- #: New function definition
- self.definition = definition
- self.name = function
- self.show_body_diff = show_body_diff
-
- def _formatchanges(self):
- res = []
- for x in self.changes:
- type, a, b = x
- if type == 'source':
- if self.show_body_diff:
- lines = ['Source differs:\n']
- for line in difflib.unified_diff(a, b, lineterm=''):
- if line[:3] in ('---', '+++'):
- continue
- color = {' ': WHITE, '-': YELLOW, '+': GREEN, '@': WHITE|BOLD}[line[0]]
- lines.append(highlight(1, color) + line + highlight(0) + '\n')
- res.append(''.join(lines))
- else:
- res.append('Source differs.')
- else:
- res.append(''.join(['Changed ', type, ' from ',
- highlight(1,15), a, highlight(0), ' to ',
- highlight(1,15), b, highlight(0), '.']))
- return ' '.join(res)
-
- def format_patch(self):
- return [self.definition]
-
-
-class DiffColumn(DiffBase):
- ALTER_COMMANDS = {
- '+' : 'ADD',
- '-' : 'DROP',
- '*' : 'ALTER',
- }
-
- def __init__(self, change, schema, table, column, columntype, columndefault, columnnotnull, changes=None):
- DiffBase.__init__(self)
- self.level = 2
- self.type = 'column'
- self.change = change
- self.schema = schema
- self.table = table
- self.column = column
- self.columntype = columntype
- self.columndefault = columndefault
- self.columnnotnull = columnnotnull
- self.name = column
- self.changes = changes
-
- def format_patch(self):
- alter_table = 'ALTER TABLE %s.%s %s COLUMN %s' % (
- self.schema,
- self.table,
- self.ALTER_COMMANDS[self.change],
- self.name,
- )
- out = []
- if self.change == '-':
- out.append('%s;' % alter_table);
- if self.change == '+':
- notnull = ''
- if self.columnnotnull:
- notnull = ' NOT NULL'
- default = ''
- if self.columndefault:
- default = ' DEFAULT %s' % self.columndefault
- out.append('%s %s%s%s;'
- % (alter_table, self.columntype, notnull, default));
- if self.change == '*':
- for type, a, b in self.changes:
- if type == 'type':
- out.append('%s TYPE %s;' % (alter_table, b))
- if type == 'notnull':
- if a and not b:
- out.append('%s DROP NOT NULL;' % alter_table)
- if not a and b:
- out.append('%s SET NOT NULL;' % alter_table)
- if type == 'default':
- if b:
- out.append('%s SET DEFAULT %s;' % (alter_table, b))
- else:
- out.append('%s DROP DEFAULT;' % alter_table)
- return out
-
-
-class DiffConstraint(DiffBase):
- def __init__(self, change, schema, table, constraint, definition, changes=None):
- DiffBase.__init__(self)
- self.level = 2
- self.type = 'constraint'
- self.change = change
- self.schema = schema
- self.table = table
- self.constraint = constraint
- self.name = constraint
- self.definition = definition
- self.changes = changes
-
- def format_patch(self):
- q_alter = 'ALTER TABLE %s.%s' % (self.schema, self.table)
- q_drop = '%s DROP CONSTRAINT %s;' % (q_alter, self.constraint)
- q_add = '%s ADD CONSTRAINT %s %s;' % (q_alter, self.constraint, self.definition)
- if self.change == '*':
- out = [q_drop, q_add]
- if self.change == '+':
- out = [q_add]
- if self.change == '-':
- out = [q_drop]
- return out
-
-
-class DiffIndex(DiffBase):
- def __init__(self, change, schema, table, index, definition, changes=None):
- DiffBase.__init__(self)
- self.level = 2
- self.type = 'index'
- self.change = change
- self.schema = schema
- self.table = table
- self.index = index
- self.name = index
- self.definition = definition
- self.changes = changes
-
- def format_patch(self):
- q_drop = 'DROP INDEX %s;' % (self.index,)
- q_add = '%s;' % (self.definition,)
- if self.change == '*':
- out = [q_drop, q_add]
- if self.change == '+':
- out = [q_add]
- if self.change == '-':
- out = [q_drop]
- return out
-
-
-class DiffType(DiffBase):
- def __init__(self, change, schema, name):
- DiffBase.__init__(self)
- self.level = 1
- self.type = 'type'
- self.change = change
- self.schema = schema
- self.name = name
-
-
-class PgDiff:
- def __init__(self, srcbrowser=None, dstbrowser=None):
- self.allowcolor = False
- self.src = srcbrowser
- self.dst = dstbrowser
- self.include_schemas = set() # if not empty, consider only these schemas for diff
- self.exclude_schemas = set() # exclude these schemas from diff
- self.include_tables = set()
- self.exclude_tables = set()
- self.function_regex = re.compile(r"")
- self.function_body_diff = False
-
- def _test_schema(self, schema):
- if self.include_schemas and schema not in self.include_schemas:
- return False
- if schema in self.exclude_schemas:
- return False
- return True
-
- def _test_table(self, table):
- if self.include_tables and table not in self.include_tables:
- return False
- if table in self.exclude_tables:
- return False
- return True
-
- def _test_function(self, function):
- return bool(self.function_regex.match(function))
-
- def _diff_names(self, src, dst):
- for x in src:
- if x in dst:
- yield ('*', x)
- else:
- yield ('-', x)
- for x in dst:
- if x not in src:
- yield ('+', x)
-
- def _compare_columns(self, a, b):
- diff = []
- if a.type != b.type:
- diff.append(('type', a.type, b.type))
- if a.notnull != b.notnull:
- diff.append(('notnull', a.notnull, b.notnull))
- if a.default != b.default:
- diff.append(('default', a.default, b.default))
- return diff
-
- def _compare_constraints(self, a, b):
- diff = []
- if a.type != b.type:
- diff.append(('type', a.type, b.type))
- if a.definition != b.definition:
- diff.append(('definition', a.definition, b.definition))
- return diff
-
- def _compare_indexes(self, a, b):
- diff = []
- if a.definition != b.definition:
- diff.append(('definition', a.definition, b.definition))
- return diff
-
- def _compare_functions(self, a, b):
- diff = []
- if a.result != b.result:
- diff.append(('result', a.result, b.result))
- # function source may differ in newlines (\n vs \r\n)
- # split lines before comparison, so that these differencies are ignored
- a_source = a.source.splitlines()
- b_source = b.source.splitlines()
- if a_source != b_source:
- diff.append(('source', a_source, b_source))
- return diff
-
- def _compare_arguments(self, a, b):
- diff = []
- if a.type != b.type:
- diff.append(('type', a.type, b.type))
- if a.mode != b.mode:
- diff.append(('mode', a.mode, b.mode))
- if a.default != b.default:
- diff.append(('default', a.default, b.default))
- return diff
-
- def _compare_types(self, a, b):
- diff = []
- if a.type != b.type:
- diff.append(('type', a.type, b.type))
- if a.elements != b.elements:
- diff.append(('elements', repr(a.elements), repr(b.elements)))
- return diff
-
- def _diff_columns(self, schema, table, src_columns, dst_columns):
- for nd in self._diff_names(src_columns, dst_columns):
- if nd[1] in dst_columns:
- dst_type = dst_columns[nd[1]].type
- dst_default = dst_columns[nd[1]].default
- dst_notnull = dst_columns[nd[1]].notnull
- else:
- dst_type = None
- dst_default = None
- dst_notnull = None
- cdo = DiffColumn(change=nd[0], schema=schema, table=table, column=nd[1],
- columntype=dst_type, columndefault=dst_default, columnnotnull=dst_notnull)
- if nd[0] == '*':
- a = src_columns[nd[1]]
- b = dst_columns[nd[1]]
- cdo.changes = self._compare_columns(a, b)
- if cdo.changes:
- yield cdo
- else:
- yield cdo
-
- def _diff_constraints(self, schema, table, src_constraints, dst_constraints):
- for nd in self._diff_names(src_constraints, dst_constraints):
- if nd[1] in dst_constraints:
- dst_definition = dst_constraints[nd[1]].definition
- else:
- dst_definition = None
- cdo = DiffConstraint(change=nd[0], schema=schema, table=table, constraint=nd[1],
- definition=dst_definition)
- if nd[0] == '*':
- a = src_constraints[nd[1]]
- b = dst_constraints[nd[1]]
- cdo.changes = self._compare_constraints(a, b)
- if cdo.changes:
- yield cdo
- else:
- yield cdo
-
- def _diff_indexes(self, schema, table, src_indexes, dst_indexes):
- for nd in self._diff_names(src_indexes, dst_indexes):
- if nd[1] in dst_indexes:
- dst_definition = dst_indexes[nd[1]].definition
- else:
- dst_definition = None
- ido = DiffIndex(change=nd[0], schema=schema, table=table, index=nd[1],
- definition=dst_definition)
- if nd[0] == '*':
- a = src_indexes[nd[1]]
- b = dst_indexes[nd[1]]
- ido.changes = self._compare_indexes(a, b)
- if ido.changes:
- yield ido
- else:
- yield ido
-
- def _diff_tables(self, schema, src_tables, dst_tables):
- for nd in self._diff_names(src_tables, dst_tables):
- if not self._test_table(nd[1]):
- continue
- tdo = DiffTable(change=nd[0], schema=schema, table=nd[1])
- if nd[0] == '*':
- # columns
- src_columns = src_tables[nd[1]].columns
- dst_columns = dst_tables[nd[1]].columns
- for cdo in self._diff_columns(schema, nd[1], src_columns, dst_columns):
- if tdo:
- yield tdo
- tdo = None
- yield cdo
- # constraints
- src_constraints = src_tables[nd[1]].constraints
- dst_constraints = dst_tables[nd[1]].constraints
- for cdo in self._diff_constraints(schema, nd[1], src_constraints, dst_constraints):
- if tdo:
- yield tdo
- tdo = None
- yield cdo
- # indexes
- src_indexes = src_tables[nd[1]].indexes
- dst_indexes = dst_tables[nd[1]].indexes
- for ido in self._diff_indexes(schema, nd[1], src_indexes, dst_indexes):
- if tdo:
- yield tdo
- tdo = None
- yield ido
- else:
- yield tdo
-
- def _diff_arguments(self, schema, function, src_args, dst_args):
- for nd in self._diff_names(src_args, dst_args):
- ado = DiffArgument(change=nd[0], schema=schema, function=function, argument=nd[1])
- if nd[0] == '*':
- a = src_args[nd[1]]
- b = dst_args[nd[1]]
- ado.changes = self._compare_arguments(a, b)
- if ado.changes:
- yield ado
- else:
- yield ado
-
- def _diff_functions(self, schema, src_functions, dst_functions):
- for nd in self._diff_names(src_functions, dst_functions):
- if not self._test_function(nd[1]):
- continue
- if nd[1] in dst_functions:
- dst_definition = dst_functions[nd[1]].definition
- else:
- dst_definition = None
- fdo = DiffFunction(change=nd[0], schema=schema, function=nd[1],
- definition=dst_definition,
- show_body_diff=self.function_body_diff)
- if nd[0] == '*':
- # compare function body and result
- a = src_functions[nd[1]]
- b = dst_functions[nd[1]]
- fdo.changes = self._compare_functions(a, b)
- if fdo.changes:
- yield fdo
- fdo = None
- # arguments
- src_args = src_functions[nd[1]].arguments
- dst_args = dst_functions[nd[1]].arguments
- for ado in self._diff_arguments(schema, nd[1], src_args, dst_args):
- if fdo:
- yield fdo
- fdo = None
- yield ado
- else:
- yield fdo
-
- def _diff_types(self, schema, src_types, dst_types):
- for nd in self._diff_names(src_types, dst_types):
- tdo = DiffType(change=nd[0], schema=schema, name=nd[1])
- if nd[0] == '*':
- a = src_types[nd[1]]
- b = dst_types[nd[1]]
- tdo.changes = self._compare_types(a, b)
- if tdo.changes:
- yield tdo
- else:
- yield tdo
-
- def iter_diff(self):
- '''Return diff between src and dst database schema.
-
- Yields one line at the time. Each line is in form of object
- iherited from DiffBase. This object contains all information
- about changes. See format() method.
-
- '''
- src_schemas = self.src.schemas
- dst_schemas = self.dst.schemas
- src = [x.name for x in src_schemas.values() if not x.system and self._test_schema(x.name)]
- dst = [x.name for x in dst_schemas.values() if not x.system and self._test_schema(x.name)]
- for nd in self._diff_names(src, dst):
- sdo = DiffSchema(change=nd[0], schema=nd[1])
- if nd[0] == '*':
- # tables
- src_tables = src_schemas[nd[1]].tables
- dst_tables = dst_schemas[nd[1]].tables
- for tdo in self._diff_tables(nd[1], src_tables, dst_tables):
- if sdo:
- yield sdo
- sdo = None
- yield tdo
- # functions
- src_functions = src_schemas[nd[1]].functions
- dst_functions = dst_schemas[nd[1]].functions
- for fdo in self._diff_functions(nd[1], src_functions, dst_functions):
- if sdo:
- yield sdo
- sdo = None
- yield fdo
- # types
- src_types = src_schemas[nd[1]].types
- dst_types = dst_schemas[nd[1]].types
- for tdo in self._diff_types(nd[1], src_types, dst_types):
- if sdo:
- yield sdo
- sdo = None
- yield tdo
- else:
- yield sdo
-
- def print_diff(self):
- '''Print diff between src and dst database schema.
-
- The output is in human readable form.
-
- Set allowcolor=True of PgDiff instance to get colored output.
-
- '''
- for ln in self.iter_diff():
- print(ln.format())
-
- def print_patch(self):
- '''Print patch for updating from src schema to dst schema.
-
- Supports table drop, add, column drop, add and following
- changes of columns:
- - type
- - set/remove not null
- - default value
-
- This is experimental, not tested very much.
- Do not use without checking the commands.
- Even if it works as intended, it can cause table lock ups
- and/or loss of data. You have been warned.
-
- '''
- for ln in self.iter_diff():
- patch = ln.format_patch()
- if patch:
- print('\n'.join(patch))
-
- def filter_schemas(self, include=[], exclude=[]):
- '''Modify list of schemas which are used for computing diff.
-
- include (list) -- if not empty, consider only these schemas for diff
- exclude (list) -- exclude these schemas from diff
-
- Order: include, exclude
- include=[] means include everything
-
- Raises:
- PgDiffError: when schema from include list is not found in src db
-
- '''
- for schema in include:
- self._check_schema_exist(schema)
- self.include_schemas.clear()
- self.include_schemas.update(include)
- self.exclude_schemas.clear()
- self.exclude_schemas.update(exclude)
-
- def filter_tables(self, include=[], exclude=[]):
- self.include_tables.clear()
- self.include_tables.update(include)
- self.exclude_tables.clear()
- self.exclude_tables.update(exclude)
-
- def filter_functions(self, regex=''):
- self.function_regex = re.compile(regex)
-
- def _check_schema_exist(self, schema):
- if not schema in self.src.schemas:
- raise PgDiffError('Schema "%s" not found in source database.' % schema)
-
--- a/pgtoolkit/pgmanager.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,515 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# PgManager - manage database connections
-#
-# Requires: Python 3.2, psycopg2
-#
-# Part of pgtoolkit
-# http://hg.devl.cz/pgtoolkit
-#
-# Copyright (c) 2010, 2011, 2012, 2013 Radek Brich <radek.brich@devl.cz>
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-"""Postgres database connection manager
-
-PgManager wraps psycopg2, adding following features:
-
- * Save and reuse database connection parameters
-
- * Connection pooling
-
- * Easy query using the with statement
-
- * Dictionary rows
-
-Example usage:
-
- from pgtoolkit import pgmanager
-
- pgm = pgmanager.get_instance()
- pgm.create_conn(hostaddr='127.0.0.1', dbname='postgres')
-
- with pgm.cursor() as curs:
- curs.execute('SELECT now() AS now')
- row = curs.fetchone_dict()
- print(row.now)
-
-First, we have obtained PgManager instance. This is like calling
-PgManager(), although in our example the instance is global. That means
-getting the instance in another module brings us all the defined connections
-etc.
-
-On second line we have created connection named 'default' (this name can be left out).
-The with statement obtains connection (actually connects to database when needed),
-then returns cursor for this connection. At the end of with statement,
-the connection is returned to the pool or closed (depending on number of connections
-in pool and on setting of pool_size parameter).
-
-The row returned by fetchone_dict() is special dict object, which can be accessed
-using item or attribute access, that is row['now'] or row.now.
-
-"""
-
-from contextlib import contextmanager
-from collections import OrderedDict
-import logging
-import threading
-import multiprocessing
-import select
-import socket
-
-import psycopg2
-import psycopg2.extensions
-
-from psycopg2 import DatabaseError, IntegrityError, OperationalError
-
-
-log_sql = logging.getLogger("pgmanager_sql")
-log_notices = logging.getLogger("pgmanager_notices")
-log_sql.addHandler(logging.NullHandler())
-# NullHandler not needed for notices which are INFO level only
-
-
-class PgManagerError(Exception):
-
- pass
-
-
-class ConnectionInfo:
-
- def __init__(self, name, dsn, isolation_level=None, keep_alive=True,
- init_statement=None, pool_size=1):
- self.name = name # connection name is logged with SQL queries
- self.dsn = dsn # dsn or string with connection parameters
- self.isolation_level = isolation_level
- self.keep_alive = keep_alive
- self.init_statement = init_statement
- self.pool_size = pool_size
-
-
-class RowDict(OrderedDict):
- """Special dictionary used for rows returned from queries.
-
- Items keep order in which columns where returned from database.
-
- It supports three styles of access:
-
- Dict style:
- row['id']
- for key in row:
- ...
-
- Object style (only works if column name does not collide with any method name):
- row.id
-
- Tuple style:
- row[0]
- id, name = row.values()
-
- """
-
- def __getitem__(self, key):
- if isinstance(key, int):
- return tuple(self.values())[key]
- else:
- return OrderedDict.__getitem__(self, key)
-
- def __getattr__(self, key):
- try:
- return self[key]
- except KeyError:
- raise AttributeError(key)
-
-
-class Cursor(psycopg2.extensions.cursor):
-
- def execute(self, query, args=None):
- # log query before executing
- self._log_query(query, args)
- try:
- return super(Cursor, self).execute(query, args)
- except DatabaseError:
- self._log_exception()
- raise
-
- def callproc(self, procname, args=None):
- # log query before executing (not query actually executed but should correspond)
- self._log_query(self._build_callproc_query(procname, len(args)), args)
- try:
- return super(Cursor, self).callproc(procname, args)
- except DatabaseError:
- self._log_exception()
- raise
-
- def row_dict(self, row, lstrip=None):
- adjustname = lambda a: a
- if lstrip:
- adjustname = lambda a: a.lstrip(lstrip)
- return RowDict(zip([adjustname(desc[0]) for desc in self.description], row))
-
- def fetchone_dict(self, lstrip=None):
- '''Return one row as OrderedDict'''
- row = super(Cursor, self).fetchone()
- if row is None:
- return None
- return self.row_dict(row, lstrip)
-
- def fetchall_dict(self, lstrip=None):
- '''Return all rows as OrderedDict'''
- rows = super(Cursor, self).fetchall()
- return [self.row_dict(row, lstrip) for row in rows]
-
- def adapt(self, row):
- if isinstance(row, RowDict):
- # dict
- adapted = dict()
- for key in row.keys():
- adapted[key] = self.mogrify('%s', [row[key]]).decode('utf8')
- return RowDict(adapted)
- else:
- # list
- return [self.mogrify('%s', [x]).decode('utf8') for x in row]
-
- def fetchone_adapted(self, lstrip=None):
- '''Like fetchone_dict() but values are quoted for direct inclusion in SQL query.
-
- This is useful when you need to generate SQL script from data returned
- by the query. Use mogrify() for simple cases.
-
- '''
- row = super(Cursor, self).fetchone()
- if row is None:
- return None
- return self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip)
-
- def fetchall_adapted(self, lstrip=None):
- '''Like fetchall_dict() but values are quoted for direct inclusion in SQL query.'''
- rows = super(Cursor, self).fetchall()
- return [self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip) for row in rows]
-
- def _log_query(self, query='?', args=None):
- name = self.connection.name if hasattr(self.connection, 'name') else '-'
- query = self.mogrify(query, args)
- log_sql.debug('[%s] %s' % (name, query.decode('utf8')))
-
- def _log_exception(self):
- name = self.connection.name if hasattr(self.connection, 'name') else '-'
- log_sql.exception('[%s] exception:' % (name,))
-
- def _build_callproc_query(self, procname, num_args):
- return 'SELECT * FROM %s(%s)' % (procname, ', '.join(['%s'] * num_args))
-
-
-class Connection(psycopg2.extensions.connection):
-
- def cursor(self, name=None):
- if name is None:
- return super(Connection, self).cursor(cursor_factory=Cursor)
- else:
- return super(Connection, self).cursor(name, cursor_factory=Cursor)
-
- def keep_alive(self):
- '''Set socket to keepalive mode. Must be called before any query.'''
- sock = socket.fromfd(self.fileno(), socket.AF_INET, socket.SOCK_STREAM)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
- try:
- # Maximum keep-alive probes before asuming the connection is lost
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
- # Interval (in seconds) between keep-alive probes
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 2)
- # Maximum idle time (in seconds) before start sending keep-alive probes
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10)
- except socket.error:
- pass
- # close duplicated fd, options set for socket stays
- sock.close()
-
-
-class PgManager:
-
- def __init__(self):
- self.conn_known = {} # available connections
- self.conn_pool = {} # active connetions
- self.lock = threading.Lock() # mutual exclusion for threads
- self.pid = multiprocessing.current_process().pid # forking check
-
- def __del__(self):
- for conn in tuple(self.conn_known.keys()):
- self.destroy_conn(conn)
-
- def create_conn(self, name='default', isolation_level=None, keep_alive=True, init_statement=None,
- pool_size=1, dsn=None, **kwargs):
- '''Create named connection.
-
- *name* -- name for connection
-
- *pool_size* -- how many connections will be kept open in pool.
- More connections will still be created but they will be closed by put_conn.
- `None` will disable pool, get_conn() will then always return same connection.
-
- *isolation_level* -- `"autocommit"`, `"read_committed"`, `"serializable"` or `None` for driver default
-
- *keep_alive* -- set socket to keepalive mode
-
- *dsn* -- connection string (parameters or data source name)
-
- Other keyword args are used as connection parameters.
-
- '''
- if name in self.conn_known:
- raise PgManagerError('Connection name "%s" already registered.' % name)
-
- if dsn is None:
- dsn = ' '.join([x[0]+'='+str(x[1]) for x in kwargs.items() if x[1] is not None])
-
- isolation_level = self._normalize_isolation_level(isolation_level)
- ci = ConnectionInfo(name, dsn, isolation_level, keep_alive, init_statement, pool_size)
-
- self.conn_known[name] = ci
- self.conn_pool[name] = []
-
- def create_conn_listen(self, name, channel, dsn=None, copy_dsn=None, **kwargs):
- '''Create connection listening for notifies.
-
- Disables pool. If you want to use pool, create other connection for that.
- This connection can be used as usual: conn.cursor() etc.
- Don't use PgManager's cursor() and put_conn().
-
- *name* -- name for connection
-
- *channel* -- listen on this channel
-
- *copy_dsn* -- specify name of other connection and its dsn will be used
-
- Other parameters forwarded to create_conn().
-
- '''
- if dsn is None and copy_dsn:
- try:
- dsn = self.conn_known[copy_dsn].dsn
- except KeyError:
- raise PgManagerError("Connection name '%s' not registered." % copy_dsn)
- listen_query = "LISTEN " + channel
- self.create_conn(name=name, pool_size=None, isolation_level='autocommit', init_statement=listen_query,
- dsn=dsn, **kwargs)
-
- def close_conn(self, name='default'):
- '''Close all connections of given name.
-
- Connection credentials are still saved.
-
- '''
- while len(self.conn_pool[name]):
- conn = self.conn_pool[name].pop()
- conn.close()
-
- def destroy_conn(self, name='default'):
- '''Destroy connection.
-
- Counterpart of create_conn.
-
- '''
- if not name in self.conn_known:
- raise PgManagerError('Connection name "%s" not registered.' % name)
-
- self.close_conn(name)
-
- del self.conn_known[name]
- del self.conn_pool[name]
-
- def knows_conn(self, name='default'):
- return name in self.conn_known
-
- def get_conn(self, name='default'):
- '''Get connection of name 'name' from pool.'''
- self._check_fork()
- self.lock.acquire()
- try:
- try:
- ci = self.conn_known[name]
- except KeyError:
- raise PgManagerError("Connection name '%s' not registered." % name)
-
- # no pool, just one static connection
- if ci.pool_size is None:
- # check for existing connection
- try:
- conn = self.conn_pool[name][0]
- if conn.closed:
- conn = None
- except IndexError:
- conn = None
- self.conn_pool[name].append(conn)
- # if no existing connection is valid, connect new one and save it
- if conn is None:
- conn = self._connect(ci)
- self.conn_pool[name][0] = conn
-
- # connection from pool
- else:
- conn = None
- while len(self.conn_pool[name]) and conn is None:
- conn = self.conn_pool[name].pop()
- if conn.closed:
- conn = None
-
- if conn is None:
- conn = self._connect(ci)
- finally:
- self.lock.release()
- return conn
-
- def put_conn(self, conn, name='default'):
- '''Put connection back to pool.
-
- *name* must be same as used for get_conn, otherwise things become broken.
-
- '''
- self.lock.acquire()
- try:
- if not name in self.conn_known:
- raise PgManagerError("Connection name '%s' not registered." % name)
-
- if len(self.conn_pool[name]) >= self.conn_known[name].pool_size:
- conn.close()
- return
-
- if conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN:
- conn.close()
- return
-
- # connection returned to the pool must not be in transaction
- if conn.get_transaction_status() != psycopg2.extensions.TRANSACTION_STATUS_IDLE:
- try:
- conn.rollback()
- except OperationalError:
- if not conn.closed:
- conn.close()
- return
-
- self.conn_pool[name].append(conn)
- finally:
- self.lock.release()
-
- @contextmanager
- def cursor(self, name='default'):
- '''Cursor context.
-
- Uses any connection info with *name* from pool
- and returns cursor for that connection.
-
- '''
- conn = self.get_conn(name)
-
- try:
- curs = conn.cursor()
- yield curs
- finally:
- curs.close()
- self.log_notices(conn)
- self.put_conn(conn, name)
-
- def log_notices(self, conn):
- for notice in conn.notices:
- log_notices.info(notice.rstrip())
- conn.notices[:] = []
-
- def wait_for_notify(self, name='default', timeout=None):
- '''Wait for asynchronous notifies, return the last one.
-
- *name* -- name of connection, must be created using `create_conn_listen()`
-
- *timeout* -- in seconds, floating point (`None` means wait forever)
-
- Returns `None` on timeout.
-
- '''
- conn = self.get_conn(name)
-
- # return any notifies on stack
- if conn.notifies:
- return conn.notifies.pop()
-
- if select.select([conn], [], [], timeout) == ([], [], []):
- # timeout
- return None
- else:
- conn.poll()
-
- # return just the last notify (we do not care for older ones)
- if conn.notifies:
- return conn.notifies.pop()
- return None
-
- def _connect(self, ci):
- conn = psycopg2.connect(ci.dsn, connection_factory=Connection)
- conn.name = ci.name
- if ci.keep_alive:
- conn.keep_alive()
- if not ci.isolation_level is None:
- conn.set_isolation_level(ci.isolation_level)
- if ci.init_statement:
- curs = conn.cursor()
- curs.execute(ci.init_statement)
- curs.connection.commit()
- curs.close()
- return conn
-
- def _normalize_isolation_level(self, level):
- if type(level) == str:
- if level.lower() == 'autocommit':
- return psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
- if level.lower() == 'read_committed':
- return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED
- if level.lower() == 'serializable':
- return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
- raise PgManagerError('Unknown isolation level name: "%s"' % level)
- return level
-
- def _check_fork(self):
- '''Check if process was forked (PID has changed).
-
- If it was, clean parent's connections.
- New connections are created for children.
- Known connection credentials are inherited, but not shared.
-
- '''
- if self.pid == multiprocessing.current_process().pid:
- # PID has not changed
- return
-
- # update saved PID
- self.pid = multiprocessing.current_process().pid
- # reinitialize lock
- self.lock = threading.Lock()
- # clean parent's connections
- for name in self.conn_pool:
- self.conn_pool[name] = []
-
- @classmethod
- def get_instance(cls):
- if not hasattr(cls, '_instance'):
- cls._instance = cls()
- return cls._instance
-
-
-def get_instance():
- return PgManager.get_instance()
-
--- a/pgtoolkit/pgstats.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,51 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# PgStats - browse database statistics
-#
-# Copyright (c) 2011 Radek Brich <radek.brich@devl.cz>
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-
-class PgStats:
- def __init__(self, conn=None):
- self.conn = conn
-
- def setconn(self, conn=None):
- self.conn = conn
-
- def _query(self, query, *args):
- try:
- curs = self.conn.cursor()
- curs.execute(query, args)
- curs.connection.commit()
- rows = curs.fetchall()
- return [dict(zip([desc[0] for desc in curs.description], row)) for row in rows]
- finally:
- curs.close()
-
- def list_long_queries(self, longer_than='1m'):
- return self._query('''SELECT
- datname, procpid, usename, current_query AS query,
- waiting, xact_start, query_start, backend_start, client_addr
- FROM pg_stat_activity
- WHERE current_query <> '<IDLE>' AND query_start < now() - interval %s
- ORDER BY query_start;''',
- longer_than)
-
--- a/pgtoolkit/progresswrapper.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,54 +0,0 @@
-
-import sys
-
-
-class ProgressWrapper:
-
- def __init__(self, f, size=0):
- self.f = f
- self.size = size
- self.readsize = 0
- self.cycles = 0
- self.print_cycles = 200
-
- def humanize(self, bytes):
- if bytes > 1024**3:
- return '%.1fG' % (bytes / 1024.**3)
- if bytes > 1024**2:
- return '%.1fM' % (bytes / 1024.**2)
- if bytes > 1024:
- return '%.1fk' % (bytes / 1024.)
- return '%d' % bytes
-
- def write(self, data):
- self.size += len(data)
- if self.cycles == 0:
- print(' read %s \r' % self.humanize(self.size), end='')
- sys.stdout.flush()
- self.cycles = self.print_cycles * 200
- else:
- self.cycles -= 1
- return self.f.write(data)
-
- def read(self, size):
- self.readsize += size
- if self.cycles == 0:
- if self.size > 0:
- percent = self.readsize * 100. / self.size
- else:
- percent = 100
- if percent > 100:
- percent = 100
- print(' written %s / %s (%.1f%%) \r' % (
- self.humanize(self.readsize),
- self.humanize(self.size),
- percent), end='')
- sys.stdout.flush()
- self.cycles = self.print_cycles
- else:
- self.cycles -= 1
- return self.f.read(size)
-
- def close(self):
- self.f.close()
-
--- a/pgtoolkit/toolbase.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,261 +0,0 @@
-from pgtoolkit import pgmanager, pgbrowser
-
-from pycolib.configparser import ConfigParser
-from pycolib.coloredformatter import ColoredFormatter
-from pycolib.ansicolor import highlight
-
-import argparse
-import logging
-import re
-import textwrap
-
-
-class ConnectionInfoNotFound(Exception):
- pass
-
-
-class BadArgsError(Exception):
- pass
-
-
-class ToolDescriptionFormatter(argparse.HelpFormatter):
- """Help message formatter which retains any formatting in descriptions."""
-
- def _fill_text(self, text, width, indent):
- return textwrap.dedent(text)
-
-
-class ToolBase:
-
- def __init__(self, name, desc=None, **kwargs):
- self.config = ConfigParser()
- self.parser = argparse.ArgumentParser(prog=name, description=desc or self.__doc__,
- formatter_class=ToolDescriptionFormatter)
- self.pgm = pgmanager.get_instance()
- self.target_isolation_level = None
-
- def setup(self, args=None):
- self.specify_args()
- self.load_args(args)
- self.init_logging()
-
- def specify_args(self):
- self.config.add_option('databases', dict)
- self.config.add_option('meta_db')
- self.config.add_option('meta_query')
- self.parser.add_argument('-Q', dest='show_queries', action='store_true',
- help='Print database queries.')
- self.parser.add_argument('-C', dest='config_file', type=str,
- help='Additional config file (besides pgtoolkit.conf).')
-
- def load_args(self, args=None, config_file=None):
- # Parse command line arguments
- self.args = self.parser.parse_args(args)
- # Load global config
- self.config.load('/etc/pgtoolkit.conf', must_exist=False)
- # Load local config
- self.config.load(config_file or 'pgtoolkit.conf', must_exist=False)
- # Load additional config
- if self.args.config_file:
- self.config.load(self.args.config_file)
-
- def init_logging(self):
- # logging
- format = ColoredFormatter(highlight(1,7,0)+'%(asctime)s %(levelname)-5s'+highlight(0)+' %(message)s', '%H:%M:%S')
- handler = logging.StreamHandler()
- handler.setFormatter(format)
- handler.setLevel(logging.DEBUG)
- self.log = logging.getLogger('main')
- self.log.addHandler(handler)
- self.log.setLevel(logging.DEBUG)
-
- log_notices = logging.getLogger('pgmanager_notices')
- log_notices.addHandler(handler)
- log_notices.setLevel(logging.DEBUG)
-
- if self.args.show_queries:
- log_sql = logging.getLogger('pgmanager_sql')
- log_sql.addHandler(handler)
- log_sql.setLevel(logging.DEBUG)
-
- def prepare_conn_from_metadb(self, name, lookup_name):
- """Create connection in pgmanager using meta DB.
-
- name -- Name for connection in pgmanager.
- lookup_name -- Name of connection in meta DB.
-
- """
- if not self.pgm.knows_conn('meta'):
- self.pgm.create_conn(name='meta', dsn=self.config.meta_db)
- with self.pgm.cursor('meta') as curs:
- curs.execute(self.config.meta_query, [lookup_name])
- row = curs.fetchone_dict()
- curs.connection.commit()
- if row:
- self.pgm.create_conn(name=name,
- isolation_level=self.target_isolation_level,
- **row)
- return True
- self.pgm.close_conn('meta')
-
- def prepare_conn_from_config(self, name, lookup_name):
- """Create connection in pgmanager using info in config.databases."""
- if self.config.databases:
- if lookup_name in self.config.databases:
- dsn = self.config.databases[lookup_name]
- self.pgm.create_conn(name=name,
- isolation_level=self.target_isolation_level,
- dsn=dsn)
- return True
-
- def prepare_conns(self, **kwargs):
- """Create connections in PgManager.
-
- Keyword arguments meaning:
- key: connection name for use in PgManager
- value: connection name in config or meta DB
-
- """
- for name in kwargs:
- lookup_name = kwargs[name]
- found = self.prepare_conn_from_config(name, lookup_name)
- if not found and self.config.meta_db:
- found = self.prepare_conn_from_metadb(name, lookup_name)
- if not found:
- raise ConnectionInfoNotFound('Connection name "%s" not found in config nor in meta DB.' % lookup_name)
-
-
-class SimpleTool(ToolBase):
-
- def __init__(self, name, desc=None, **kwargs):
- ToolBase.__init__(self, name, desc, **kwargs)
-
- def specify_args(self):
- ToolBase.specify_args(self)
- self.config.add_option('target', type=str, default=None)
- self.parser.add_argument('target', nargs='?', type=str, help='Target database')
-
- def load_args(self, args=None, config_file=None):
- ToolBase.load_args(self, args, config_file)
- self.target = self.args.target or self.config.target or 'default'
-
- def setup(self, args=None):
- ToolBase.setup(self, args)
- self.prepare_conns(target=self.target)
-
-
-class SrcDstTool(ToolBase):
-
- def __init__(self, name, desc=None, *, allow_reverse=False, force_reverse=False, **kwargs):
- ToolBase.__init__(self, name, desc, **kwargs)
- self.allow_reverse = allow_reverse
- self.force_reverse = force_reverse
-
- def specify_args(self):
- ToolBase.specify_args(self)
- self.parser.add_argument('src', metavar='source', type=str, help='Source database')
- self.parser.add_argument('dst', metavar='destination', type=str, help='Destination database')
- if self.allow_reverse:
- self.parser.add_argument('-r', '--reverse', action='store_true', help='Reverse operation. Swap source and destination.')
-
- def load_args(self, args=None, config_file=None):
- ToolBase.load_args(self, args, config_file)
- if self.is_reversed():
- self.args.src, self.args.dst = self.args.dst, self.args.src
-
- def setup(self, args=None):
- ToolBase.setup(self, args)
- self.prepare_conns(src=self.args.src, dst=self.args.dst)
-
- def is_reversed(self):
- return ('reverse' in self.args and self.args.reverse) or self.force_reverse
-
-
-class SrcDstTablesTool(SrcDstTool):
-
- def specify_args(self):
- SrcDstTool.specify_args(self)
- self.parser.add_argument('-t', '--src-table', metavar='source_table',
- dest='srctable', type=str, default='', help='Source table name.')
- self.parser.add_argument('-s', '--src-schema', metavar='source_schema',
- dest='srcschema', type=str, default='', help='Source schema name (default=public).')
- self.parser.add_argument('--dst-table', metavar='destination_table',
- dest='dsttable', type=str, default='', help='Destination table name (default=source_table).')
- self.parser.add_argument('--dst-schema', metavar='destination_schema',
- dest='dstschema', type=str, default='', help='Destination schema name (default=source_schema).')
- self.parser.add_argument('--regex', action='store_true', help="Use RE in schema or table name.")
-
- def load_args(self, args=None, config_file=None):
- SrcDstTool.load_args(self, args, config_file)
- self.load_table_names()
-
- def load_table_names(self):
- self.schema1 = self.args.srcschema
- self.table1 = self.args.srctable
- self.schema2 = self.args.dstschema
- self.table2 = self.args.dsttable
-
- # check regex - it applies to source name, dest name must not be specified
- # applies to only one - schema or table name
- if self.args.regex:
- if self.table2 or (self.schema2 and not self.table1):
- raise BadArgsError('Cannot specify both --regex and --dst-schema, --dst-table.')
- # schema defaults to public
- if self.table1 and not self.schema1:
- self.schema1 = 'public'
- # dest defaults to source
- if not self.schema2:
- self.schema2 = self.schema1
- if not self.table2:
- self.table2 = self.table1
-
- # swap src, dst when in reverse mode
- if self.is_reversed():
- self.schema1, self.schema2 = self.schema2, self.schema1
- self.table1, self.table2 = self.table2, self.table1
-
- def tables(self):
- '''Generator. Yields schema1, table1, schema2, table2.'''
- srcconn = self.pgm.get_conn('src')
- try:
- srcbrowser = pgbrowser.PgBrowser(srcconn)
- if self.args.regex:
- if not self.table1:
- # all tables from schemas defined by regex
- for item in self._iter_schemas_regex(srcbrowser, self.schema1):
- yield item
- else:
- # all tables defined by regex
- for item in self._iter_tables_regex(srcbrowser, self.schema1, self.schema2, self.table1):
- yield item
- else:
- if not self.table1:
- if not self.schema1:
- # all tables from all schemas
- for item in self._iter_schemas_regex(srcbrowser, self.schema1):
- yield item
- else:
- # all tables from specified schema
- for item in self._iter_tables_regex(srcbrowser, self.schema1, self.schema2, self.table1):
- yield item
- else:
- # one table
- yield (self.schema1, self.table1, self.schema2, self.table2)
- finally:
- self.pgm.put_conn(srcconn, 'src')
-
- def _iter_schemas_regex(self, browser, regex):
- for schema in browser.list_schemas():
- if schema['system']:
- continue
- schemaname = schema['name']
- if re.match(regex, schemaname):
- for item in self._iter_tables_regex(browser, schemaname, schemaname, ''):
- yield item
-
- def _iter_tables_regex(self, browser, schema1, schema2, regex):
- for table in browser.list_tables(schema1):
- tablename = table['name']
- if re.match(regex, tablename):
- yield (schema1, tablename, schema2, tablename)
-
--- a/pgtoolkit/tools/__init__.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,3 +0,0 @@
-__all__ = ['analyzeall', 'batchcopy', 'bigtables', 'listdepends',
- 'listserial', 'longqueries', 'loopquery',
- 'runquery', 'schemadiff', 'tablediff', 'tablesync']
--- a/pgtoolkit/tools/analyzeall.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,52 +0,0 @@
-from pgtoolkit.toolbase import SimpleTool
-from pgtoolkit import pgbrowser
-
-
-class AnalyzeAllTool(SimpleTool):
-
- """
- Analyze/vacuum all tables in selected schemas.
-
- Partially emulates VACUUM ANALYZE VERBOSE query.
- But this program is more configurable, skips pg_catalog etc.
-
- """
-
- def __init__(self):
- SimpleTool.__init__(self, name='analyzeall')
- self.target_isolation_level = 'autocommit'
-
- def specify_args(self):
- SimpleTool.specify_args(self)
- self.parser.add_argument('-s', dest='schema', nargs='*', help='Schema filter')
- self.parser.add_argument('--vacuum', action='store_true', help='Call VACUUM ANALYZE')
- self.parser.add_argument('--vacuum-full', action='store_true', help='Call VACUUM FULL ANALYZE')
- self.parser.add_argument('--reindex', action='store_true', help='Call REINDEX TABLE')
-
- def main(self):
- browser = pgbrowser.PgBrowser(self.pgm.get_conn('target'))
-
- query_patterns = ['ANALYZE %s.%s;']
- if self.args.vacuum:
- query_patterns = ['VACUUM ANALYZE %s.%s;']
- if self.args.vacuum_full:
- query_patterns = ['VACUUM FULL ANALYZE %s.%s;']
- if self.args.reindex:
- query_patterns += ['REINDEX TABLE %s.%s;']
-
- schema_list = self.args.schema
- if not schema_list:
- schema_list = [schema['name'] for schema in browser.list_schemas() if not schema['system']]
-
- for schema in schema_list:
- tables = browser.list_tables(schema=schema)
- with self.pgm.cursor('target') as curs:
- for table in tables:
- for query_pattern in query_patterns:
- query = query_pattern % (schema, table['name'])
- self.log.info(query)
- curs.execute(query, [])
-
-
-cls = AnalyzeAllTool
-
--- a/pgtoolkit/tools/batchcopy.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,61 +0,0 @@
-from pgtoolkit.toolbase import SrcDstTablesTool
-from pgtoolkit.pgmanager import IntegrityError
-
-
-class BatchCopyTool(SrcDstTablesTool):
-
- """
- Copy data from one table to another, filtering by specified condition.
-
- """
-
- def __init__(self):
- SrcDstTablesTool.__init__(self, name='batchcopy', desc='')
-
- def specify_args(self):
- SrcDstTablesTool.specify_args(self)
- self.parser.add_argument('--table-name', type=str, help='Table to be copied.')
- self.parser.add_argument('--src-filter', type=str, help='WHERE condition for source query.')
- self.parser.add_argument('--file-with-ids', type=str, help='Read source IDs from file (each ID on new line). Use these in --src-filter as {ids}')
- self.parser.add_argument('--dst-exists', choices=['rollback', 'ignore', 'update'], default='rollback', help='What to do when destination record already exists.')
-
- def main(self):
- # read list of IDs from file
- ids = '<no IDs read>'
- if self.args.file_with_ids:
- with open(self.args.file_with_ids, 'r') as f:
- ids = ','.join(ln.rstrip() for ln in f.readlines())
-
- # read source data
- with self.pgm.cursor('src') as src_curs:
- condition = self.args.src_filter.format(ids=ids) or 'true'
- src_curs.execute('SELECT * FROM {} WHERE {}'.format(self.args.table_name, condition))
- #TODO: ORDER BY id OFFSET 0 LIMIT 100
- data = src_curs.fetchall_dict()
- src_curs.connection.commit()
-
- with self.pgm.cursor('dst') as dst_curs:
- copied = 0
- for row in data:
- keys = ', '.join(row.keys())
- values_mask = ', '.join(['%s'] * len(row))
- query = 'INSERT INTO {} ({}) VALUES ({})'.format(self.args.table_name, keys, values_mask)
- try:
- dst_curs.execute('SAVEPOINT the_query;')
- dst_curs.execute(query, list(row.values()))
- dst_curs.execute('RELEASE SAVEPOINT the_query;')
- copied += 1
- except IntegrityError:
- if self.args.dst_exists == 'rollback':
- dst_curs.connection.rollback()
- break
- elif self.args.dst_exists == 'ignore':
- dst_curs.execute('ROLLBACK TO SAVEPOINT the_query;')
- elif self.args.dst_exists == 'update':
- raise NotImplementedError()
- dst_curs.connection.commit()
-
- self.log.info('Copied %s rows.', copied)
-
-
-cls = BatchCopyTool
--- a/pgtoolkit/tools/bigtables.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,55 +0,0 @@
-from pgtoolkit.toolbase import SimpleTool
-from pgtoolkit import pgbrowser
-from pycolib.prettysize import prettysize_short
-from pycolib.ansicolor import highlight
-
-
-class BigTablesTool(SimpleTool):
-
- """List largest tables.
-
- Reads size statistics of tables and indexes from pgcatalog.
-
- """
-
- def __init__(self):
- SimpleTool.__init__(self, name='bigtables')
-
- def specify_args(self):
- SimpleTool.specify_args(self)
- self.parser.add_argument('-n', '--limit', metavar='NUM', dest='limit', type=int, default=5, help='Show NUM biggest tables.')
- self.parser.add_argument('-v', '--details', dest='details', action='store_true', help='Show sizes of data and individual indexes.')
-
- def main(self):
- browser = pgbrowser.PgBrowser(self.pgm.get_conn('target'))
-
- # scan all tables from all shemas, remember names and sizes
- all_tables = []
- all_indexes = []
- schemas = browser.list_schemas()
- for schema in schemas:
- tables = browser.list_tables(schema['name'])
- for table in tables:
- table_name = '%s.%s' % (schema['name'], table['name'])
- indexes = browser.list_indexes(table['name'], schema['name'])
- for index in indexes:
- all_indexes.append({'name': index['name'], 'table': table_name, 'size': index['size']})
- size_with_indexes = table['size'] + sum(index['size'] for index in indexes)
- all_tables.append({'name': table_name, 'size': table['size'], 'indexes': indexes, 'size_with_indexes': size_with_indexes})
-
- # print names and sizes of 20 largest tables
- for table in sorted(all_tables, reverse=True, key=lambda x: x['size_with_indexes'])[:self.args.limit]:
- print(highlight(1) + prettysize_short(table['size_with_indexes'], trailing_zeros=True).rjust(8) + highlight(0),
- '(total)'.ljust(8),
- highlight(1) + table['name'] + highlight(0), sep=' ')
- if self.args.details:
- print(prettysize_short(table['size'], trailing_zeros=True).rjust(8),
- '(data)'.ljust(8), sep=' ')
- for index in sorted(table['indexes'], reverse=True, key=lambda x: x['size']):
- print(prettysize_short(index['size'], trailing_zeros=True).rjust(8),
- '(index)'.ljust(8), index['name'], sep=' ')
- print()
-
-
-cls = BigTablesTool
-
--- a/pgtoolkit/tools/listdepends.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,31 +0,0 @@
-from pgtoolkit.toolbase import SimpleTool
-from pgtoolkit import pgbrowser
-
-
-class ListDependsTool(SimpleTool):
-
- """
- List column dependencies.
-
- """
-
- def __init__(self):
- SimpleTool.__init__(self, name='listdepends')
-
- def specify_args(self):
- SimpleTool.specify_args(self)
- self.parser.add_argument('table', metavar='table', type=str, help='Table name.')
- self.parser.add_argument('column', metavar='column', type=str, help='Column name.')
- self.parser.add_argument('-s', '--schema', dest='schema', metavar='schema',
- type=str, default='public', help='Schema name (default=public).')
-
- def main(self):
- browser = pgbrowser.PgBrowser(self.pgm.get_conn('target'))
-
- objects = browser.list_column_usage(self.args.table, self.args.column, schema=self.args.schema)
- for obj in sorted(objects, key=lambda x: (x['type'], x['schema'], x['name'])):
- print(obj['type'], ' ', obj['schema'], '.', obj['name'], sep='')
-
-
-cls = ListDependsTool
-
--- a/pgtoolkit/tools/listserial.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,69 +0,0 @@
-from pgtoolkit.toolbase import SimpleTool
-from pgtoolkit import pgbrowser
-from pycolib.ansicolor import highlight, WHITE, YELLOW, RED, BOLD
-
-
-class ListSerialTool(SimpleTool):
-
- """List sequences near to overflow.
-
- Checks all sequences attached to a column of type integer.
-
- Highlight dangerous values of sequence:
- * Yellow - near overflow (90%)
- * Red - already over...
-
- Does not list sequences with value under 50% of range.
-
- """
-
- max_int = 2147483647
-
- def __init__(self):
- SimpleTool.__init__(self, name='listserial')
-
- def main(self):
- conn = self.pgm.get_conn('target')
- browser = pgbrowser.PgBrowser(conn)
- rows = browser.list_sequences()
- sequences = []
- for row in rows:
- if row['related_column_type'] == 'integer':
- # read sequence attributes like last_value
- q = 'SELECT * FROM "%s"."%s"' % (row['sequence_schema'], row['sequence_name'])
- curs = conn.cursor()
- curs.execute(q)
- attrs = curs.fetchone_dict()
- # skip this sequence if its cycled and has safe max_value
- if attrs['is_cycled'] and attrs['max_value'] <= self.max_int:
- continue
- # skip sequences with last_value not yet in half of max_int
- if attrs['last_value'] < self.max_int / 2:
- continue
- # remember rest of sequences
- row['attrs'] = attrs
- sequences.append(row)
- # sort most dangerous on top
- sequences.sort(key=lambda x: x['attrs']['last_value'], reverse=True)
- # print out what we've found
- for seq in sequences:
- print('Sequence:', seq['sequence_schema'] + '.' + seq['sequence_name'])
- print(' Related:', seq['sequence_schema'] + '.' + seq['related_table'], seq['related_column'], '(' + seq['related_column_type'] + ')')
- print(' integer max', '2147483647')
- # colorize last value
- last_val = seq['attrs']['last_value']
- col = WHITE + BOLD
- if last_val > self.max_int * 0.9:
- # near max
- col = YELLOW + BOLD
- if last_val > self.max_int:
- # over max
- col = RED + BOLD
- print(' last_value', highlight(1, col) + str(last_val) + highlight(0))
- for key in ('min_value', 'max_value', 'is_cycled'):
- print(' ', key, seq['attrs'][key])
- print()
-
-
-cls = ListSerialTool
-
--- a/pgtoolkit/tools/longqueries.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,34 +0,0 @@
-from pgtoolkit.toolbase import SimpleTool
-from pgtoolkit import pgstats
-from pycolib.ansicolor import highlight, YELLOW, BOLD
-
-
-class LongQueriesTool(SimpleTool):
-
- """
- List long running queries.
- """
-
- def __init__(self):
- SimpleTool.__init__(self, name='longqueries')
-
- def specify_args(self):
- SimpleTool.specify_args(self)
- self.parser.add_argument('--age', default='1m', help='How long must be the query running to be listed.')
-
- def main(self):
- stats = pgstats.PgStats(self.pgm.get_conn('target'))
-
- for ln in stats.list_long_queries(self.args.age):
- print(highlight(1),
- 'backend PID: ', ln['procpid'],
- ', query_start: ', ln['query_start'].strftime('%F %T'),
- ', client IP: ', ln['client_addr'],
- ln['waiting'] and ', ' + highlight(1, YELLOW|BOLD) + 'waiting' or '',
- highlight(0), sep='')
- print(ln['query'])
- print()
-
-
-cls = LongQueriesTool
-
--- a/pgtoolkit/tools/loopquery.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,92 +0,0 @@
-from pgtoolkit.toolbase import SimpleTool
-
-import logging.handlers
-import time
-from datetime import datetime, timedelta
-
-
-class LoopQueryTool(SimpleTool):
-
- """
- Execute queries in loop, with configurable interval.
-
- """
-
- def __init__(self):
- SimpleTool.__init__(self, name='loopquery')
- self.target_isolation_level = 'autocommit'
-
- def specify_args(self):
- SimpleTool.specify_args(self)
- self.parser.add_argument('-q', dest='queries', metavar='QUERY', nargs='*', help='Queries to run.')
- self.parser.add_argument('--mins', dest='delay_mins', type=int, help='Delay between queries in minutes.')
- self.parser.add_argument('--secs', dest='delay_secs', type=int, help='Delay between queries in seconds.')
-
- self.config.add_option('queries', type=list, default=[])
- self.config.add_option('delay_mins', type=int, default=0)
- self.config.add_option('delay_secs', type=int, default=0)
- self.config.add_option('log_path', type=str)
-
- def load_args(self, args=None, config_file=None):
- SimpleTool.load_args(self, args, config_file)
- self.queries = self.args.queries or self.config.queries
- self.delay_mins = self.args.delay_mins or self.config.delay_mins
- self.delay_secs = self.args.delay_secs or self.config.delay_secs
-
- def init_logging(self):
- SimpleTool.init_logging(self)
- if self.config.log_path:
- self.init_file_logs(self.config.log_path)
-
- def init_file_logs(self, path):
- format = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s', '%y-%m-%d %H:%M:%S')
- handler = logging.handlers.TimedRotatingFileHandler(path+'/main.log', when='midnight', backupCount=5)
- handler.setFormatter(format)
- handler.setLevel(logging.DEBUG)
- logging.getLogger('main').addHandler(handler)
-
- format = logging.Formatter('%(asctime)s %(message)s', '%y-%m-%d %H:%M:%S')
- handler = logging.handlers.TimedRotatingFileHandler(path+'/pgnotices.log', when='midnight', backupCount=5)
- handler.setFormatter(format)
- handler.setLevel(logging.DEBUG)
- logging.getLogger('pgmanager_notices').addHandler(handler)
-
- def main(self):
- self.reset()
- while True:
- self.wait()
- self.action()
-
- def reset(self):
- """Check current time, set next action time."""
- dt = datetime.today()
- dt = dt.replace(microsecond = 0)
- self.next_action_time = dt + timedelta(minutes = self.delay_mins,
- seconds = self.delay_secs)
-
- def wait(self):
- """Wait for action time, compute next action time."""
- now = datetime.today()
- self.log.debug('Next run %s', self.next_action_time)
- if self.next_action_time > now:
- td = self.next_action_time - now
- self.log.debug('Sleep %ds', td.seconds + td.microseconds/1e6)
- time.sleep(td.seconds + td.microseconds/1e6)
- self.next_action_time += timedelta(minutes = self.delay_mins,
- seconds = self.delay_secs)
- # in case that action took too long and next planned time would
- # be in past -> reset planner
- if self.next_action_time < now:
- self.reset()
-
- def action(self):
- """Execute the queries."""
- for q in self.queries:
- self.log.info('%s', q)
- with self.pgm.cursor('target') as curs:
- curs.execute(q)
- self.log.info('Done')
-
-
-cls = LoopQueryTool
-
--- a/pgtoolkit/tools/runquery.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,96 +0,0 @@
-from pgtoolkit.toolbase import SimpleTool
-
-import logging.handlers
-import time
-from datetime import datetime, timedelta
-from psycopg2 import ProgrammingError
-
-
-class RunQueryTool(ToolBase):
-
- """
- Execute configured queries in target database.
- """
-
- def __init__(self):
- SimpleTool.__init__(self, name='runquery')
- self.target_isolation_level = 'autocommit'
-
- def specify_args(self):
- SimpleTool.specify_args(self)
- self.parser.add_argument('-q', dest='queries', metavar='QUERY', nargs='*', help='Queries to run.')
- self.parser.add_argument('-f', dest='file', metavar='FILE', help='Read query from file.')
- self.parser.add_argument('--one-query-per-line', action='store_true', help='When reading queries from file, consider each line as separate query.')
- self.parser.add_argument('-p', '--parameter', dest='parameters', metavar='PARAM=VALUE', nargs='*',
- help="If query should be used as format template, these parameters will be substituted.")
- self.parser.add_argument('--output-file', dest='output_file', metavar='OUTPUT_FILE', help='Write query result in specified file.')
- self.parser.add_argument('--format', dest='format', metavar='FORMAT', help='Format string for each line in output file (using Python\'s format()).')
-
- self.config.add_option('queries', type=list, default=[])
- self.config.add_option('log_path', type=str)
-
- def load_args(self, args=None, config_file=None):
- SimpleTool.load_args(self, args, config_file)
- self.queries = self.args.queries or self.config.queries
- # read query from file
- if self.args.file:
- with open(self.args.file, 'r', encoding='utf8') as f:
- data = f.read()
- if self.args.one_query_per_line:
- file_queries = [ln for ln in data.splitlines() if not ln.lstrip().startswith('--')]
- self.queries = file_queries + self.queries
- else:
- self.queries.insert(0, data)
- # prepare parameters
- self._prepare_parameters(self.args.parameters)
-
- def init_logging(self):
- ToolBase.init_logging(self)
- if self.config.log_path:
- self.init_file_logs(self.config.log_path)
-
- def init_file_logs(self, path):
- format = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s', '%y-%m-%d %H:%M:%S')
- handler = logging.handlers.TimedRotatingFileHandler(path+'/main.log', when='midnight', backupCount=5)
- handler.setFormatter(format)
- handler.setLevel(logging.DEBUG)
- logging.getLogger('main').addHandler(handler)
-
- format = logging.Formatter('%(asctime)s %(message)s', '%y-%m-%d %H:%M:%S')
- handler = logging.handlers.TimedRotatingFileHandler(path+'/pgnotices.log', when='midnight', backupCount=5)
- handler.setFormatter(format)
- handler.setLevel(logging.DEBUG)
- logging.getLogger('pgmanager_notices').addHandler(handler)
-
- def main(self):
- """Execute the queries."""
- for q in self.queries:
- if self.parameters:
- q = q.format(**self.parameters)
- self.log.info('%s', q if len(q) < 100 else q[:100]+'...')
- with self.pgm.cursor('target') as curs:
- curs.execute(q)
- self.log.info('Rows affected: %d', curs.rowcount)
- try:
- rows = curs.fetchall_dict()
- self._write_output_file(rows)
- except ProgrammingError:
- pass
- self.log.info('Done')
-
- def _write_output_file(self, rows):
- if not self.args.output_file:
- return
- with open(self.args.output_file, 'w', encoding='utf8') as f:
- for row in rows:
- print(self.args.format.format(row), file=f)
-
- def _prepare_parameters(self, parameters):
- self.parameters = {}
- for parameter in parameters or ():
- name, value = parameter.split('=', 1)
- self.parameters[name] = value
-
-
-cls = RunQueryTool
-
--- a/pgtoolkit/tools/schemadiff.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,51 +0,0 @@
-from pgtoolkit.toolbase import SrcDstTool
-from pgtoolkit import pgmanager, pgbrowser, pgdiff, toolbase
-
-
-class SchemaDiffTool(SrcDstTool):
-
- """
- Print differences in database schema.
-
- Prints changes from source to destination.
- SQL patch updates source database schema to destination schema.
-
- """
-
- def __init__(self):
- SrcDstTool.__init__(self, name='schemadiff', allow_reverse=True)
-
- def specify_args(self):
- SrcDstTool.specify_args(self)
- self.parser.add_argument('-s', dest='schema', nargs='*', help='Schema filter')
- self.parser.add_argument('-t', dest='table', nargs='*', help='Table filter')
- self.parser.add_argument('-f', dest='function', type=str, help='Function filter (regex)')
- self.parser.add_argument('--sql', action='store_true', help='Output is SQL script.')
- self.parser.add_argument('--body', action='store_true', help='Output diff for function bodies.')
-
- def main(self):
- srcbrowser = pgbrowser.PgBrowser(self.pgm.get_conn('src'))
- dstbrowser = pgbrowser.PgBrowser(self.pgm.get_conn('dst'))
-
- pgd = pgdiff.PgDiff(srcbrowser, dstbrowser)
-
- try:
- if self.args.schema:
- pgd.filter_schemas(include=self.args.schema)
- if self.args.table:
- pgd.filter_tables(include=self.args.table)
- if self.args.function:
- pgd.filter_functions(self.args.function)
- if self.args.body:
- pgd.function_body_diff = True
-
- if self.args.sql:
- pgd.print_patch()
- else:
- pgd.print_diff()
- except pgdiff.PgDiffError as e:
- print('PgDiff error:', str(e))
-
-
-cls = SchemaDiffTool
-
--- a/pgtoolkit/tools/tablediff.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,69 +0,0 @@
-from pgtoolkit import toolbase, pgmanager, pgdatadiff
-from pgtoolkit.toolbase import SrcDstTablesTool
-from pycolib.ansicolor import highlight, BOLD, YELLOW
-
-import sys
-
-
-class TableDiffTool(SrcDstTablesTool):
-
- """
- Print differences between data in tables.
-
- Requirements:
- * Source table must have defined PRIMARY KEY.
- * Destination table must contain all columns from source table.
- Order is not important.
-
- """
-
- def __init__(self):
- SrcDstTablesTool.__init__(self, name='tablediff', desc=self.__doc__, allow_reverse=True)
-
- def specify_args(self):
- SrcDstTablesTool.specify_args(self)
- self.parser.add_argument('--sql', action='store_true', help='Output is SQL script.')
- self.parser.add_argument('--rowcount', action='store_true', help='Compare number of rows.')
- self.parser.add_argument('-o', '--output-file', help='Output file for sql queries.')
-
- def main(self):
- srcconn = self.pgm.get_conn('src')
- dstconn = self.pgm.get_conn('dst')
-
- if self.args.output_file:
- output_file = open(self.args.output_file, 'w')
- else:
- output_file = sys.stdout
-
- dd = pgdatadiff.PgDataDiff(srcconn, dstconn)
-
- for srcschema, srctable, dstschema, dsttable in self.tables():
- print('-- Diff from [%s] %s.%s to [%s] %s.%s' % (
- self.args.src, srcschema, srctable,
- self.args.dst, dstschema, dsttable),
- file=output_file)
-
- if self.args.rowcount:
- with self.pgm.cursor('src') as curs:
- curs.execute('''SELECT count(*) FROM "%s"."%s"''' % (srcschema, srctable))
- srccount = curs.fetchone()[0]
- with self.pgm.cursor('dst') as curs:
- curs.execute('''SELECT count(*) FROM "%s"."%s"''' % (dstschema, dsttable))
- dstcount = curs.fetchone()[0]
- if srccount != dstcount:
- print(highlight(1, BOLD | YELLOW),
- "Row count differs: src=%s dst=%s" % (srccount, dstcount),
- highlight(0), sep='', file=output_file)
- continue
-
- dd.settable1(srctable, srcschema)
- dd.settable2(dsttable, dstschema)
-
- if self.args.sql:
- dd.print_patch(file=output_file)
- else:
- dd.print_diff(file=output_file)
-
-
-cls = TableDiffTool
-
--- a/pgtoolkit/tools/tablesync.py Mon May 26 18:18:21 2014 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,60 +0,0 @@
-from pgtoolkit.toolbase import SrcDstTool
-from pgtoolkit.tools.tablediff import TableDiffTool
-from pgtoolkit.tools.runquery import RunQueryTool
-
-
-class TableSyncTool(SrcDstTool):
-
- """
- Synchronize tables between two databases (tablediff + runquery).
-
- This will essentially call following commands on each table from list:
- * pgtool tablediff <source> <target> -r -s <schema> -t <table> --sql -o /tmp/diff.sql
- * pgtool runquery <target> -f /tmp/diff.sql
-
- """
-
- def __init__(self):
- SrcDstTool.__init__(self, name='tablesync', force_reverse=True)
- self.tablediff = TableDiffTool()
- self.tablediff.specify_args()
- self.runquery = RunQueryTool()
- self.runquery.specify_args()
-
- def specify_args(self):
- SrcDstTool.specify_args(self)
- self.parser.add_argument('-t', dest='tables', metavar='table', nargs='*',
- help="Tables to be synchronized.")
- self.parser.add_argument('-s', '--schema', metavar='default_schema',
- dest='schema', type=str, default='public', help='Default schema name.')
-
- def init_logging(self):
- SrcDstTool.init_logging(self)
- self.runquery.log = self.log
-
- def setup(self, args=None):
- SrcDstTool.setup(self, args)
- self.target_isolation_level = 'autocommit'
- self.prepare_conns(target=self.args.src)
-
- def main(self):
- for table in self.args.tables:
- self.sync(table)
-
- def sync(self, table):
- if '.' in table:
- schema, table = table.split('.', 1)
- else:
- schema = self.args.schema
- # Call tablediff
- self.tablediff.load_args([self.args.src, self.args.dst,
- '-r', '-s', schema, '-t', table, '--sql', '-o', '/tmp/diff.sql'])
- self.tablediff.main()
- # Call runquery
- self.runquery.load_args([self.args.src, '--one-query-per-line',
- '-f', '/tmp/diff.sql'])
- self.runquery.main()
-
-
-cls = TableSyncTool
-
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit.conf.example Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,11 @@
+### named connections
+databases = {
+ # database for tests.py (postgres, mysql - remove one of the lines to skip particular tests)
+ 'test' : 'host=127.0.0.1 dbname=test user=test password=test',
+ 'test_mysql' : 'host=127.0.0.1 db=test user=test password=test',
+}
+
+### meta database (contains connection parameters for other databases)
+meta_db = 'host=10.8.0.1 dbname=central'
+# query for connection parameters, input is database name (will be placed instead of %s)
+meta_query = '''SELECT host, port, dbname, user, password FROM config.databases WHERE name = %s LIMIT 1'''
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/delayedquery.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,46 @@
+import threading
+import time
+
+
+class DelayedQueryThread(threading.Thread):
+ def __init__(self, targetdbm, targetname, delay, query, args):
+ threading.Thread.__init__(self)
+ self.targetdbm = targetdbm
+ self.targetname = targetname
+ self.delay = delay
+ self.query = query
+ self.args = args
+
+ def run(self):
+ time.sleep(self.delay)
+ with self.targetdbm.cursor(self.targetname) as curs:
+ curs.execute(self.query, self.args)
+
+
+class DelayedQuery:
+ def __init__(self, targetdbm):
+ '''Initialize DelayedQuery.
+
+ targetdbm -- PgManager-like object
+
+ '''
+ self.targetdbm = targetdbm
+ self.queryids = set()
+
+ def add(self, delay, query, args, targetname='default', queryid=None):
+ '''Add query to schedule.
+
+ delay -- how long to wait, in seconds
+ query, args -- query to be run after delay
+ targetname -- name of connection in PgManager
+ queryid -- discard if query with same id is already scheduled
+
+ '''
+ if queryid is not None:
+ if queryid in self.queryids:
+ return
+ self.queryids.add(queryid)
+
+ t = DelayedQueryThread(self.targetdbm, targetname, delay, query, args)
+ t.start()
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/mymanager.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,330 @@
+# -*- coding: utf-8 -*-
+#
+# MyManager - manage database connections (MySQL version)
+#
+# Requires: Python 2.6 / 2.7 / 3.2, MySQLdb
+#
+# Part of pydbkit
+# http://hg.devl.cz/pydbkit
+#
+# Copyright (c) 2011, 2013 Radek Brich <radek.brich@devl.cz>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+"""MySQL database connection manager
+
+MyManager wraps MySQLdb in same manner as PgManager wraps psycopg2.
+It's fully compatible so it should work as drop-in replacement for PgManager.
+
+It adds following features over MySQLdb:
+
+ * Save and reuse database connection parameters
+
+ * Connection pooling
+
+ * Easy query using the with statement
+
+ * Dictionary rows
+
+Example:
+
+ from pydbkit import mymanager
+
+ dbm = mymanager.get_instance()
+ dbm.create_conn(host='127.0.0.1', dbname='default')
+
+ with dbm.cursor() as curs:
+ curs.execute('SELECT now() AS now')
+ row = curs.fetchone_dict()
+ print(row.now)
+
+See PgManager docs for more information.
+
+"""
+
+from contextlib import contextmanager
+from collections import OrderedDict
+import logging
+import threading
+import multiprocessing
+
+import MySQLdb
+import MySQLdb.cursors
+
+from MySQLdb import DatabaseError, IntegrityError, OperationalError
+
+from pydbkit.pgmanager import RowDict
+
+
+log_sql = logging.getLogger("mymanager_sql")
+log_sql.addHandler(logging.NullHandler())
+
+
+class MyManagerError(Exception):
+
+ pass
+
+
+class ConnectionInfo:
+
+ def __init__(self, name, isolation_level=None,
+ init_statement=None, pool_size=1, **kw):
+ self.name = name # connection name is logged with SQL queries
+ self.isolation_level = isolation_level
+ self.init_statement = init_statement
+ self.pool_size = pool_size
+ self.parameters = kw
+ self.adjust_parameters()
+
+ def adjust_parameters(self):
+ '''Rename Postgres parameters to proper value for MySQL.'''
+ m = {'dbname' : 'db', 'password' : 'passwd'}
+ res = dict()
+ for k, v in list(self.parameters.items()):
+ if k in m:
+ k = m[k]
+ res[k] = v
+ self.parameters = res
+
+
+class Cursor(MySQLdb.cursors.Cursor):
+
+ def execute(self, query, args=None):
+ try:
+ return super(Cursor, self).execute(query, args)
+ finally:
+ self._log_query(query, args)
+
+ def callproc(self, procname, args=None):
+ try:
+ return super(Cursor, self).callproc(procname, args)
+ finally:
+ self._log_query(query, args)
+
+ def row_dict(self, row, lstrip=None):
+ adjustname = lambda a: a
+ if lstrip:
+ adjustname = lambda a: a.lstrip(lstrip)
+ return RowDict(zip([adjustname(desc[0]) for desc in self.description], row))
+
+ def fetchone_dict(self, lstrip=None):
+ row = super(Cursor, self).fetchone()
+ if row is None:
+ return None
+ return self.row_dict(row, lstrip)
+
+ def fetchall_dict(self, lstrip=None):
+ rows = super(Cursor, self).fetchall()
+ return [self.row_dict(row, lstrip) for row in rows]
+
+ def mogrify(self, query, args):
+ """Get query with substituted args as it will be send to server."""
+ if isinstance(query, bytes):
+ query = query.decode()
+ if args is not None:
+ db = self._get_db()
+ query = query % db.literal(args)
+ return query
+
+ def _log_query(self, query, args):
+ name = self.connection.name if hasattr(self.connection, 'name') else '-'
+ query = self.mogrify(query, args)
+ if isinstance(query, bytes):
+ db = self._get_db()
+ charset = db.character_set_name()
+ query = query.decode(charset)
+ log_sql.debug('[%s] %s' % (name, query))
+
+
+class MyManager:
+
+ def __init__(self):
+ self.conn_known = {} # available connections
+ self.conn_pool = {}
+ self.lock = threading.Lock()
+ self.pid = multiprocessing.current_process().pid # forking check
+
+ def __del__(self):
+ for conn in tuple(self.conn_known.keys()):
+ self.destroy_conn(conn)
+
+ def create_conn(self, name='default', isolation_level=None, **kw):
+ '''Create named connection.'''
+ if name in self.conn_known:
+ raise MyManagerError('Connection name "%s" already registered.' % name)
+
+ isolation_level = self._normalize_isolation_level(isolation_level)
+ ci = ConnectionInfo(name, isolation_level, **kw)
+
+ self.conn_known[name] = ci
+ self.conn_pool[name] = []
+
+ def close_conn(self, name='default'):
+ '''Close all connections of given name.
+
+ Connection credentials are still saved.
+
+ '''
+ while len(self.conn_pool[name]):
+ conn = self.conn_pool[name].pop()
+ conn.close()
+
+ def destroy_conn(self, name='default'):
+ '''Destroy connection.
+
+ Counterpart of create_conn.
+
+ '''
+ if not name in self.conn_known:
+ raise MyManagerError('Connection name "%s" not registered.' % name)
+
+ self.close_conn(name)
+
+ del self.conn_known[name]
+ del self.conn_pool[name]
+
+ def get_conn(self, name='default'):
+ '''Get connection of name 'name' from pool.'''
+ self._check_fork()
+ self.lock.acquire()
+ try:
+ if not name in self.conn_known:
+ raise MyManagerError("Connection name '%s' not registered." % name)
+
+ # connection from pool
+ conn = None
+ while len(self.conn_pool[name]) and conn is None:
+ conn = self.conn_pool[name].pop()
+ try:
+ conn.ping()
+ except MySQLdb.MySQLError:
+ conn.close()
+ conn = None
+
+ if conn is None:
+ ci = self.conn_known[name]
+ conn = self._connect(ci)
+ finally:
+ self.lock.release()
+ return conn
+
+ def put_conn(self, conn, name='default'):
+ '''Put connection back to pool.
+
+ Name must be same as used for get_conn,
+ otherwise things become broken.
+
+ '''
+ self.lock.acquire()
+ try:
+ if not name in self.conn_known:
+ raise MyManagerError("Connection name '%s' not registered." % name)
+
+ if len(self.conn_pool[name]) >= self.conn_known[name].pool_size:
+ conn.close()
+ return
+
+ # connection returned to the pool must not be in transaction
+ try:
+ conn.rollback()
+ except OperationalError:
+ conn.close()
+ return
+
+ self.conn_pool[name].append(conn)
+ finally:
+ self.lock.release()
+
+ @contextmanager
+ def cursor(self, name='default'):
+ '''Cursor context.
+
+ Uses any connection of name 'name' from pool
+ and returns cursor for that connection.
+
+ '''
+ conn = self.get_conn(name)
+
+ try:
+ curs = conn.cursor()
+ yield curs
+ finally:
+ curs.close()
+ self.put_conn(conn, name)
+
+ def _connect(self, ci):
+ conn = MySQLdb.connect(cursorclass=Cursor, **ci.parameters)
+ if not ci.isolation_level is None:
+ if ci.isolation_level == 'AUTOCOMMIT':
+ conn.autocommit(True)
+ else:
+ curs = conn.cursor()
+ curs.execute('SET SESSION TRANSACTION ISOLATION LEVEL ' + ci.isolation_level)
+ curs.close()
+ if ci.init_statement:
+ curs = conn.cursor()
+ curs.execute(ci.init_statement)
+ curs.connection.commit()
+ curs.close()
+ return conn
+
+ def _normalize_isolation_level(self, level):
+ if level is None:
+ return level
+ if type(level) == str:
+ level = level.upper().replace('_', ' ')
+ if level in (
+ 'AUTOCOMMIT',
+ 'READ UNCOMMITTED',
+ 'READ COMMITTED',
+ 'REPEATABLE READ',
+ 'SERIALIZABLE'):
+ return level
+ raise MyManagerError('Unknown isolation level name: "%s"', level)
+
+ def _check_fork(self):
+ '''Check if process was forked (PID has changed).
+
+ If it was, clean parent's connections.
+ New connections are created for children.
+ Known connection credentials are inherited, but not shared.
+
+ '''
+ if self.pid == multiprocessing.current_process().pid:
+ # PID has not changed
+ return
+
+ # update saved PID
+ self.pid = multiprocessing.current_process().pid
+ # reinitialize lock
+ self.lock = threading.Lock()
+ # clean parent's connections
+ for name in self.conn_pool:
+ self.conn_pool[name] = []
+
+ @classmethod
+ def get_instance(cls):
+ if not hasattr(cls, '_instance'):
+ cls._instance = cls()
+ return cls._instance
+
+
+def get_instance():
+ return MyManager.get_instance()
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/mymanager_oursql.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,347 @@
+# -*- coding: utf-8 -*-
+#
+# MyManager - manage database connections (MySQL version)
+#
+# Requires: Python 2.6 / 2.7 / 3.2, oursql
+#
+# Part of pydbkit
+# http://hg.devl.cz/pydbkit
+#
+# Copyright (c) 2011, 2013 Radek Brich <radek.brich@devl.cz>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+"""MySQL database connection manager
+
+MyManager wraps oursql in same manner as PgManager wraps psycopg2.
+It's fully compatible so it should work as drop-in replacement for PgManager.
+
+It adds following features over oursql:
+
+ * Save and reuse database connection parameters
+
+ * Connection pooling
+
+ * Easy query using the with statement
+
+ * Dictionary rows
+
+Example:
+
+ from pydbkit import mymanager_oursql
+
+ dbm = mymanager.get_instance()
+ dbm.create_conn(host='127.0.0.1', dbname='default')
+
+ with dbm.cursor() as curs:
+ curs.execute('SELECT now() AS now')
+ row = curs.fetchone_dict()
+ print(row.now)
+
+See PgManager docs for more information.
+
+"""
+
+from contextlib import contextmanager
+from collections import OrderedDict
+import logging
+import threading
+import multiprocessing
+
+import oursql
+
+from oursql import DatabaseError, IntegrityError, OperationalError
+
+
+log_sql = logging.getLogger("mymanager_sql")
+log_sql.addHandler(logging.NullHandler())
+
+
+class MyManagerError(Exception):
+
+ pass
+
+
+class RowDict(OrderedDict):
+ """Special dictionary used for rows returned from queries.
+
+ Items keep order in which columns where returned from database.
+
+ It supports three styles of access:
+
+ Dict style:
+ row['id']
+ for key in row:
+ ...
+
+ Object style (only works if column name does not collide with any method name):
+ row.id
+
+ Tuple style:
+ row[0]
+ id, name = row.values()
+
+ """
+
+ def __getitem__(self, key):
+ if isinstance(key, int):
+ return tuple(self.values())[key]
+ else:
+ return OrderedDict.__getitem__(self, key)
+
+ def __getattr__(self, key):
+ try:
+ return self[key]
+ except KeyError:
+ raise AttributeError(key)
+
+
+class ConnectionInfo:
+
+ def __init__(self, name, isolation_level=None,
+ init_statement=None, pool_size=1, **kw):
+ self.name = name # connection name is logged with SQL queries
+ self.isolation_level = isolation_level
+ self.init_statement = init_statement
+ self.pool_size = pool_size
+ self.parameters = kw
+ self.adjust_parameters()
+
+ def adjust_parameters(self):
+ '''Rename Postgres parameters to proper value for MySQL.'''
+ m = {'dbname' : 'db', 'password' : 'passwd'}
+ res = dict()
+ for k, v in list(self.parameters.items()):
+ if k in m:
+ k = m[k]
+ res[k] = v
+ self.parameters = res
+
+
+class Cursor(oursql.Cursor):
+
+ def execute(self, query, args=[]):
+ try:
+ return super(Cursor, self).execute(query, args)
+ finally:
+ self._log_query(query, args)
+
+ def callproc(self, procname, args=[]):
+ try:
+ return super(Cursor, self).callproc(procname, args)
+ finally:
+ self._log_query(query, args)
+
+ def row_dict(self, row, lstrip=None):
+ adjustname = lambda a: a
+ if lstrip:
+ adjustname = lambda a: a.lstrip(lstrip)
+ return RowDict(zip([adjustname(desc[0]) for desc in self.description], row))
+
+ def fetchone_dict(self, lstrip=None):
+ row = super(Cursor, self).fetchone()
+ if row is None:
+ return None
+ return self.row_dict(row, lstrip)
+
+ def fetchall_dict(self, lstrip=None):
+ rows = super(Cursor, self).fetchall()
+ return [self.row_dict(row, lstrip) for row in rows]
+
+ def _log_query(self, query, args):
+ name = self.connection.name if hasattr(self.connection, 'name') else '-'
+ log_sql.debug('[%s] %s %s' % (name, query, args))
+
+
+class MyManager:
+
+ def __init__(self):
+ self.conn_known = {} # available connections
+ self.conn_pool = {}
+ self.lock = threading.Lock()
+ self.pid = multiprocessing.current_process().pid # forking check
+
+ def __del__(self):
+ for conn in tuple(self.conn_known.keys()):
+ self.destroy_conn(conn)
+
+ def create_conn(self, name='default', isolation_level=None, **kw):
+ '''Create named connection.'''
+ if name in self.conn_known:
+ raise MyManagerError('Connection name "%s" already registered.' % name)
+
+ isolation_level = self._normalize_isolation_level(isolation_level)
+ ci = ConnectionInfo(name, isolation_level, **kw)
+
+ self.conn_known[name] = ci
+ self.conn_pool[name] = []
+
+ def close_conn(self, name='default'):
+ '''Close all connections of given name.
+
+ Connection credentials are still saved.
+
+ '''
+ while len(self.conn_pool[name]):
+ conn = self.conn_pool[name].pop()
+ conn.close()
+
+ def destroy_conn(self, name='default'):
+ '''Destroy connection.
+
+ Counterpart of create_conn.
+
+ '''
+ if not name in self.conn_known:
+ raise MyManagerError('Connection name "%s" not registered.' % name)
+
+ self.close_conn(name)
+
+ del self.conn_known[name]
+ del self.conn_pool[name]
+
+ def get_conn(self, name='default'):
+ '''Get connection of name 'name' from pool.'''
+ self._check_fork()
+ self.lock.acquire()
+ try:
+ if not name in self.conn_known:
+ raise MyManagerError("Connection name '%s' not registered." % name)
+
+ # connection from pool
+ conn = None
+ while len(self.conn_pool[name]) and conn is None:
+ conn = self.conn_pool[name].pop()
+ try:
+ conn.ping()
+ except oursql.MySQLError:
+ conn.close()
+ conn = None
+
+ if conn is None:
+ ci = self.conn_known[name]
+ conn = self._connect(ci)
+ finally:
+ self.lock.release()
+ return conn
+
+ def put_conn(self, conn, name='default'):
+ '''Put connection back to pool.
+
+ Name must be same as used for get_conn,
+ otherwise things become broken.
+
+ '''
+ self.lock.acquire()
+ try:
+ if not name in self.conn_known:
+ raise MyManagerError("Connection name '%s' not registered." % name)
+
+ if len(self.conn_pool[name]) >= self.conn_known[name].pool_size:
+ conn.close()
+ return
+
+ # connection returned to the pool must not be in transaction
+ try:
+ conn.rollback()
+ except OperationalError:
+ conn.close()
+ return
+
+ self.conn_pool[name].append(conn)
+ finally:
+ self.lock.release()
+
+ @contextmanager
+ def cursor(self, name='default'):
+ '''Cursor context.
+
+ Uses any connection of name 'name' from pool
+ and returns cursor for that connection.
+
+ '''
+ conn = self.get_conn(name)
+
+ try:
+ curs = conn.cursor()
+ yield curs
+ finally:
+ curs.close()
+ self.put_conn(conn, name)
+
+ def _connect(self, ci):
+ conn = oursql.connect(default_cursor=Cursor, **ci.parameters)
+ if not ci.isolation_level is None:
+ if ci.isolation_level == 'AUTOCOMMIT':
+ conn.autocommit(True)
+ else:
+ curs = conn.cursor()
+ curs.execute('SET SESSION TRANSACTION ISOLATION LEVEL ' + ci.isolation_level)
+ curs.close()
+ if ci.init_statement:
+ curs = conn.cursor()
+ curs.execute(ci.init_statement)
+ curs.connection.commit()
+ curs.close()
+ return conn
+
+ def _normalize_isolation_level(self, level):
+ if level is None:
+ return level
+ if type(level) == str:
+ level = level.upper().replace('_', ' ')
+ if level in (
+ 'AUTOCOMMIT',
+ 'READ UNCOMMITTED',
+ 'READ COMMITTED',
+ 'REPEATABLE READ',
+ 'SERIALIZABLE'):
+ return level
+ raise MyManagerError('Unknown isolation level name: "%s"', level)
+
+ def _check_fork(self):
+ '''Check if process was forked (PID has changed).
+
+ If it was, clean parent's connections.
+ New connections are created for children.
+ Known connection credentials are inherited, but not shared.
+
+ '''
+ if self.pid == multiprocessing.current_process().pid:
+ # PID has not changed
+ return
+
+ # update saved PID
+ self.pid = multiprocessing.current_process().pid
+ # reinitialize lock
+ self.lock = threading.Lock()
+ # clean parent's connections
+ for name in self.conn_pool:
+ self.conn_pool[name] = []
+
+ @classmethod
+ def get_instance(cls):
+ if not hasattr(cls, '_instance'):
+ cls._instance = cls()
+ return cls._instance
+
+
+def get_instance():
+ return MyManager.get_instance()
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/pgbrowser.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,496 @@
+# -*- coding: utf-8 -*-
+#
+# PgBrowser - browse database schema and metadata
+#
+# Some of the queries came from psql.
+#
+# Copyright (c) 2011 Radek Brich <radek.brich@devl.cz>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+
+from collections import OrderedDict
+
+
+class Column:
+ def __init__(self, browser, table,
+ name, type, notnull, hasdefault, default, description):
+ self.browser = browser # Browser instance
+ self.table = table # Table instance
+ self.name = name
+ self.type = type
+ self.notnull = notnull
+ self.hasdefault = hasdefault
+ self.default = default
+ self.description = description
+
+
+class Constraint:
+ def __init__(self, browser, table, name, type, fname, fschema, definition):
+ self.browser = browser
+ self.table = table
+ self.name = name
+ self.type = type
+ self.fname = fname # foreign table name
+ self.fschema = fschema # foreign table schema
+ self.definition = definition
+
+
+class Index:
+ def __init__(self, browser, table,
+ name, primary, unique, clustered, valid, definition,
+ columns, size):
+ self.browser = browser
+ self.table = table
+ self.name = name
+ self.primary = primary
+ self.unique = unique
+ self.clustered = clustered
+ self.valid = valid
+ self.definition = definition
+ self.columns = columns
+ self.size = size
+
+
+class Table:
+ def __init__(self, browser, schema, name, owner, size, description, options):
+ self._columns = None
+ self._constraints = None
+ self._indexes = None
+ self.browser = browser # Browser instance
+ self.schema = schema # Schema instance
+ self.name = name # table name, str
+ self.owner = owner
+ self.size = size
+ self.description = description
+ self.options = options or []
+
+ def refresh(self):
+ self.refresh_columns()
+ self.refresh_constraints()
+ self.refresh_indexes()
+
+ def refresh_columns(self):
+ rows = self.browser.list_columns(self.name, self.schema.name)
+ self._columns = OrderedDict([(x['name'], Column(self.browser, self, **x)) for x in rows])
+
+ def refresh_constraints(self):
+ rows = self.browser.list_constraints(self.name, self.schema.name)
+ self._constraints = OrderedDict([(x['name'], Constraint(self.browser, self, **x)) for x in rows])
+
+ def refresh_indexes(self):
+ rows = self.browser.list_indexes(self.name, self.schema.name)
+ self._indexes = OrderedDict([(x['name'], Index(self.browser, self, **x)) for x in rows])
+
+ def getcolumns(self):
+ if self._columns is None:
+ self.refresh_columns()
+ return self._columns
+ columns = property(getcolumns)
+
+ def getconstraints(self):
+ if self._constraints is None:
+ self.refresh_constraints()
+ return self._constraints
+ constraints = property(getconstraints)
+
+ def getindexes(self):
+ if self._indexes is None:
+ self.refresh_indexes()
+ return self._indexes
+ indexes = property(getindexes)
+
+
+class Argument:
+ def __init__(self, browser, function, name, type, mode, default):
+ # PgBrowser instance
+ self.browser = browser
+ # Function instance
+ self.function = function
+ self.name = name
+ self.type = type
+ self.mode = mode
+ self.default = default
+
+
+class Function:
+ def __init__(self, browser, schema, oid, name, function_name, type, result, source):
+ self.browser = browser
+ self.schema = schema
+ self.oid = oid
+ #: unique name - function name + arg types
+ self.name = name
+ #: pure function name without args
+ self.function_name = function_name
+ self.type = type
+ self.result = result
+ self.source = source
+ self._arguments = None
+ self._definition = None
+
+ def refresh(self):
+ self.refresh_args()
+
+ def refresh_args(self):
+ rows = self.browser.list_function_args(self.oid)
+ self._arguments = OrderedDict([(x['name'], Argument(self.browser, self, **x)) for x in rows])
+
+ @property
+ def arguments(self):
+ if self._arguments is None:
+ self.refresh_args()
+ return self._arguments
+
+ @property
+ def definition(self):
+ """Get full function definition including CREATE command."""
+ if not self._definition:
+ self._definition = self.browser.get_function_definition(self.oid)
+ return self._definition
+
+
+class Type:
+ def __init__(self, browser, schema, name, type, elements, description):
+ self.browser = browser
+ self.schema = schema
+ self.name = name
+ self.type = type
+ self.elements = elements
+ self.description = description
+
+
+class Schema:
+ def __init__(self, browser, name, owner, acl, description, system):
+ self._tables = None
+ self._functions = None
+ self._types = None
+ self.browser = browser
+ self.name = name
+ self.owner = owner
+ self.acl = acl
+ self.description = description
+ self.system = system
+
+ def refresh(self):
+ self.refresh_tables()
+ self.refresh_functions()
+
+ def refresh_tables(self):
+ rows = self.browser.list_tables(self.name)
+ self._tables = OrderedDict([(x['name'], Table(self.browser, self, **x)) for x in rows])
+
+ def refresh_functions(self):
+ rows = self.browser.list_functions(self.name)
+ self._functions = OrderedDict([(x['name'], Function(self.browser, self, **x)) for x in rows])
+
+ def refresh_types(self):
+ rows = self.browser.list_types(self.name)
+ self._types = OrderedDict([(x['name'], Type(self.browser, self, **x)) for x in rows])
+
+ @property
+ def tables(self):
+ if self._tables is None:
+ self.refresh_tables()
+ return self._tables
+
+ @property
+ def functions(self):
+ if self._functions is None:
+ self.refresh_functions()
+ return self._functions
+
+ @property
+ def types(self):
+ if self._types is None:
+ self.refresh_types()
+ return self._types
+
+
+class PgBrowser:
+ def __init__(self, conn=None):
+ self._schemas = None
+ self.conn = conn
+
+ def setconn(self, conn=None):
+ self.conn = conn
+
+ def refresh(self):
+ self.refresh_schemas()
+
+ def refresh_schemas(self):
+ rows = self.list_schemas()
+ self._schemas = OrderedDict([(x['name'], Schema(self, **x)) for x in rows])
+
+ @property
+ def schemas(self):
+ if self._schemas is None:
+ self.refresh_schemas()
+ return self._schemas
+
+ def _query(self, query, args):
+ try:
+ curs = self.conn.cursor()
+ curs.execute(query, args)
+ curs.connection.commit()
+ rows = curs.fetchall()
+ return [dict(zip([desc[0] for desc in curs.description], row)) for row in rows]
+ finally:
+ curs.close()
+
+ def list_databases(self):
+ return self._query('''
+ SELECT
+ d.datname as "name",
+ pg_catalog.pg_get_userbyid(d.datdba) as "owner",
+ pg_catalog.pg_encoding_to_char(d.encoding) as "encoding",
+ d.datcollate as "collation",
+ d.datctype as "ctype",
+ d.datacl AS "acl",
+ CASE WHEN pg_catalog.has_database_privilege(d.datname, 'CONNECT')
+ THEN pg_catalog.pg_database_size(d.datname)
+ ELSE -1 -- No access
+ END as "size",
+ t.spcname as "tablespace",
+ pg_catalog.shobj_description(d.oid, 'pg_database') as "description"
+ FROM pg_catalog.pg_database d
+ JOIN pg_catalog.pg_tablespace t on d.dattablespace = t.oid
+ ORDER BY 1;
+ ''', [])
+
+ def list_schemas(self):
+ return self._query('''
+ SELECT
+ n.nspname AS "name",
+ pg_catalog.pg_get_userbyid(n.nspowner) AS "owner",
+ n.nspacl AS "acl",
+ pg_catalog.obj_description(n.oid, 'pg_namespace') AS "description",
+ CASE WHEN n.nspname IN ('information_schema', 'pg_catalog', 'pg_toast')
+ OR n.nspname ~ '^pg_temp_' OR n.nspname ~ '^pg_toast_temp_'
+ THEN TRUE
+ ELSE FALSE
+ END AS "system"
+ FROM pg_catalog.pg_namespace n
+ ORDER BY 1;
+ ''', [])
+
+ def list_tables(self, schema='public'):
+ return self._query('''
+ SELECT
+ c.relname as "name",
+ pg_catalog.pg_get_userbyid(c.relowner) as "owner",
+ pg_catalog.pg_relation_size(c.oid) as "size",
+ pg_catalog.obj_description(c.oid, 'pg_class') as "description",
+ c.reloptions as "options"
+ FROM pg_catalog.pg_class c
+ LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
+ WHERE n.nspname = %s AND c.relkind IN ('r','s','')
+ ORDER BY 1;
+ ''', [schema])
+
+ def list_columns(self, table, schema='public', order=2):
+ return self._query('''
+ SELECT
+ --a.attrelid,
+ a.attname as "name",
+ format_type(a.atttypid, a.atttypmod) AS "type",
+ a.attnotnull as "notnull",
+ a.atthasdef as "hasdefault",
+ pg_catalog.pg_get_expr(d.adbin, d.adrelid) as "default",
+ pg_catalog.col_description(a.attrelid, a.attnum) AS "description"
+ FROM pg_catalog.pg_attribute a
+ LEFT JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
+ LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
+ LEFT JOIN pg_catalog.pg_attrdef d ON a.attrelid = d.adrelid AND a.attnum = d.adnum
+ WHERE n.nspname = %s AND c.relname = %s AND a.attnum > 0 AND NOT a.attisdropped
+ ORDER BY ''' + str(order), [schema, table])
+
+ def list_constraints(self, table, schema='public'):
+ return self._query('''
+ SELECT
+ r.conname AS "name",
+ r.contype AS "type",
+ cf.relname AS "fname",
+ nf.nspname AS "fschema",
+ pg_catalog.pg_get_constraintdef(r.oid, true) as "definition"
+ FROM pg_catalog.pg_constraint r
+ JOIN pg_catalog.pg_class c ON r.conrelid = c.oid
+ JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
+ LEFT JOIN pg_catalog.pg_class cf ON r.confrelid = cf.oid
+ LEFT JOIN pg_catalog.pg_namespace nf ON nf.oid = cf.relnamespace
+ WHERE n.nspname = %s AND c.relname = %s
+ ORDER BY 1
+ ''', [schema, table])
+
+ def list_indexes(self, table, schema='public'):
+ return self._query('''
+ SELECT
+ c2.relname as "name",
+ i.indisprimary as "primary",
+ i.indisunique as "unique",
+ i.indisclustered as "clustered",
+ i.indisvalid as "valid",
+ pg_catalog.pg_get_indexdef(i.indexrelid, 0, true) as "definition",
+ ARRAY(SELECT a.attname FROM pg_catalog.pg_attribute a WHERE a.attrelid = c2.oid ORDER BY attnum) AS "columns",
+ pg_catalog.pg_relation_size(c2.oid) as "size"
+ --c2.reltablespace as "tablespace_oid"
+ FROM pg_catalog.pg_class c
+ JOIN pg_catalog.pg_index i ON c.oid = i.indrelid
+ JOIN pg_catalog.pg_class c2 ON i.indexrelid = c2.oid
+ JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
+ WHERE n.nspname = %(schema)s AND c.relname = %(table)s
+ ORDER BY i.indisprimary DESC, i.indisunique DESC, c2.relname
+ ''', {'schema': schema, 'table': table})
+
+ def list_functions(self, schema='public'):
+ '''List functions in schema.'''
+ return self._query('''
+ SELECT
+ p.oid as "oid",
+ p.proname || '(' || array_to_string(
+ array(SELECT pg_catalog.format_type(unnest(p.proargtypes), NULL)),
+ ', '
+ ) || ')' as "name",
+ p.proname as "function_name",
+ pg_catalog.pg_get_function_result(p.oid) as "result",
+ p.prosrc as "source",
+ CASE
+ WHEN p.proisagg THEN 'agg'
+ WHEN p.proiswindow THEN 'window'
+ WHEN p.prorettype = 'pg_catalog.trigger'::pg_catalog.regtype THEN 'trigger'
+ ELSE 'normal'
+ END as "type"
+ FROM pg_catalog.pg_proc p
+ LEFT JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace
+ WHERE n.nspname = %s
+ ORDER BY 1, 2, 4;
+ ''', [schema])
+
+ def list_function_args(self, oid):
+ """List function arguments.
+
+ Notes about query:
+ type: Use allargtypes if present, argtypes otherwise.
+ The trick with [0:999] moves lower bound from 0 to default 1
+ by slicing all elements (slices has always lower bound 1).
+ mode: This trick makes array of NULLs of same length as argnames,
+ in case argmodes is NULL.
+ default: Use pg_get_expr, split output by ', '
+ FIXME: will fail if ', ' is present in default value string.
+ """
+ return self._query('''
+ SELECT
+ unnest(p.proargnames) AS "name",
+ pg_catalog.format_type(unnest(
+ COALESCE(p.proallargtypes, (p.proargtypes::oid[])[0:999])
+ ), NULL) AS "type",
+ unnest(
+ COALESCE(
+ p.proargmodes::text[],
+ array(SELECT NULL::text FROM generate_series(1, array_upper(p.proargnames, 1)))
+ )
+ ) AS "mode",
+ unnest(array_cat(
+ array_fill(NULL::text, array[COALESCE(array_upper(p.proargnames,1),0) - p.pronargdefaults]),
+ string_to_array(pg_get_expr(p.proargdefaults, 'pg_proc'::regclass, true), ', ')
+ )) AS "default"
+ FROM pg_proc p
+ WHERE p.oid = %s''', [oid])
+
+ def get_function_definition(self, oid):
+ """Get full function definition, including CREATE command etc.
+
+ Args:
+ oid: function oid from pg_catalog.pg_proc (returned by list_functions)
+
+ """
+ return self._query('''SELECT pg_get_functiondef(%s) AS definition;''', [oid])[0]['definition']
+
+ def list_types(self, schema='public'):
+ """List types in schema.
+
+ http://www.postgresql.org/docs/8.4/static/catalog-pg-type.html
+
+ """
+ return self._query('''
+ SELECT
+ t.typname AS "name",
+ CASE
+ WHEN t.typtype = 'b' THEN 'base'::text
+ WHEN t.typtype = 'c' THEN 'composite'::text
+ WHEN t.typtype = 'd' THEN 'domain'::text
+ WHEN t.typtype = 'e' THEN 'enum'::text
+ WHEN t.typtype = 'p' THEN 'pseudo'::text
+ END AS "type",
+ ARRAY(
+ SELECT e.enumlabel
+ FROM pg_catalog.pg_enum e
+ WHERE e.enumtypid = t.oid
+ ORDER BY e.oid
+ ) AS "elements",
+ pg_catalog.obj_description(t.oid, 'pg_type') AS "description"
+ FROM pg_catalog.pg_type t
+ LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
+ WHERE (t.typrelid = 0 OR (SELECT c.relkind = 'c' FROM pg_catalog.pg_class c WHERE c.oid = t.typrelid))
+ AND NOT EXISTS(SELECT 1 FROM pg_catalog.pg_type el WHERE el.oid = t.typelem AND el.typarray = t.oid)
+ AND n.nspname <> 'pg_catalog'
+ AND n.nspname <> 'information_schema'
+ AND n.nspname = %(schema)s
+ ORDER BY 1, 2;
+ ''', {'schema': schema})
+
+ def list_sequences(self, schema=None):
+ '''List sequences in schema.'''
+ return self._query('''
+ SELECT
+ nc.nspname AS "sequence_schema",
+ c.relname AS "sequence_name",
+ t.relname AS "related_table",
+ a.attname AS "related_column",
+ format_type(a.atttypid, a.atttypmod) AS "related_column_type"
+ FROM pg_class c
+ JOIN pg_namespace nc ON nc.oid = c.relnamespace
+ JOIN pg_depend d ON d.objid = c.oid
+ JOIN pg_class t ON d.refobjid = t.oid
+ JOIN pg_attribute a ON (d.refobjid, d.refobjsubid) = (a.attrelid, a.attnum)
+ WHERE c.relkind = 'S' AND NOT pg_is_other_temp_schema(nc.oid)
+ ''' + (schema and ' AND nc.nspname = %(schema)s' or '') + '''
+ ''', {'schema': schema})
+
+ def list_column_usage(self, table, column, schema='public'):
+ '''List objects using the column.
+
+ Currently shows views and constraints which use the column.
+
+ This is useful to find which views block alteration of column type etc.
+
+ '''
+ return self._query('''
+ SELECT
+ 'view' AS type, view_schema AS schema, view_name AS name
+ FROM information_schema.view_column_usage
+ WHERE table_schema=%(schema)s AND table_name=%(table)s AND column_name=%(column)s
+
+ UNION
+
+ SELECT
+ 'constraint' AS type, constraint_schema AS schema, constraint_name AS name
+ FROM information_schema.constraint_column_usage
+ WHERE table_schema=%(schema)s AND table_name=%(table)s AND column_name=%(column)s
+ ''', {'schema':schema, 'table':table, 'column':column})
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/pgdatacopy.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,116 @@
+# -*- coding: utf-8 -*-
+#
+# PgDataCopy - copy data between tables
+#
+# Copyright (c) 2012 Radek Brich <radek.brich@devl.cz>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+
+import io
+
+
+class TargetNotEmptyError(Exception):
+
+ def __init__(self, msg, table):
+ Exception.__init__(self, msg)
+ self.table = table
+
+
+class PgDataCopy:
+
+ def __init__(self, conn1, conn2):
+ self.conn1 = conn1
+ self.conn2 = conn2
+ self.fulltable1 = None
+ self.fulltable2 = None
+
+ def set_source(self, table, schema='public'):
+ self.schema1 = schema
+ self.table1 = table
+ self.fulltable1 = '"' + schema + '"."'+ table + '"'
+
+ def set_destination(self, table, schema='public'):
+ self.schema2 = schema
+ self.table2 = table
+ self.fulltable2 = '"' + schema + '"."'+ table + '"'
+
+ def copy(self):
+ self.check()
+
+ buf = io.StringIO()
+ try:
+ self.read(buf)
+ data = buf.getvalue()
+ finally:
+ buf.close()
+
+ buf = io.StringIO(data)
+ try:
+ self.write(buf)
+ finally:
+ buf.close()
+
+ self.analyze()
+
+ def check(self):
+ '''Check that target table does not contain any data (otherwise cannot copy).'''
+ q = self._compose_check(self.fulltable2)
+ curs = self.conn2.cursor()
+ curs.execute(q)
+ curs.connection.commit()
+ if curs.rowcount > 0:
+ raise TargetNotEmptyError('Target table contains data.', self.fulltable2)
+ self.cols = [desc[0] for desc in curs.description]
+
+ def read(self, tmpfile):
+ '''Read contents from source table.'''
+ q = self._compose_read(self.fulltable1, self.cols)
+ curs = self.conn1.cursor()
+ curs.copy_expert(q, tmpfile)
+ curs.connection.commit()
+
+ def write(self, tmpfile):
+ '''Write source table contents to target table.'''
+ q = self._compose_write(self.fulltable2, self.cols)
+ curs = self.conn2.cursor()
+ curs.copy_expert(q, tmpfile)
+ curs.connection.commit()
+
+ def analyze(self):
+ '''Analyze target table.'''
+ q = self._compose_analyze(self.fulltable2)
+ curs = self.conn2.cursor()
+ curs.execute(q)
+ curs.connection.commit()
+
+ def _compose_check(self, table):
+ return 'SELECT * FROM %s LIMIT 1' % table
+
+ def _compose_read(self, table, cols):
+ collist = ', '.join(['"%s"' % col for col in cols])
+ return 'COPY %s (%s) TO STDOUT' % (table, collist)
+
+ def _compose_write(self, table, cols):
+ collist = ', '.join(['"%s"' % col for col in cols])
+ return 'COPY %s (%s) FROM STDIN' % (table, collist)
+
+ def _compose_analyze(self, table):
+ return 'ANALYZE %s' % table
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/pgdatadiff.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,273 @@
+# -*- coding: utf-8 -*-
+#
+# PgDataDiff - compare tables, print data differencies
+#
+# Copyright (c) 2011 Radek Brich <radek.brich@devl.cz>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+
+from collections import OrderedDict
+
+from pydbkit import pgbrowser
+from pycolib.ansicolor import *
+
+import sys
+
+
+class DiffData:
+ COLORS = {
+ '+' : BOLD | GREEN,
+ '-' : BOLD | RED,
+ '*' : BOLD | YELLOW,
+ 'V' : BOLD | WHITE,
+ 'K' : BOLD | BLUE}
+
+ def __init__(self, change, cols1, cols2, key=None):
+ """
+
+ change - one of '+', '-', '*' (add, remove, update)
+ cols1 - original column values (OrderedDict)
+ cols2 - new column values (OrderedDict)
+ key - primary key columns (OrderedDict)
+
+ """
+ self.change = change
+ self.cols1 = cols1
+ self.cols2 = cols2
+ self.key = key
+
+ def format(self):
+ out = []
+
+ out.append(highlight(1, self.COLORS[self.change]))
+ out.extend([self.change, ' '])
+
+ out.extend(self._format_changes())
+
+ out.append(highlight(0))
+
+ return ''.join(out)
+
+ def format_patch(self, table):
+ method = {
+ '+' : self._format_insert,
+ '-' : self._format_delete,
+ '*' : self._format_update}
+
+ return method[self.change](table)
+
+ def _format_changes(self):
+ if self.cols1 and not self.cols2:
+ return [', '.join([self._format_value_del(*x) for x in self.cols1.items()])]
+ if not self.cols1 and self.cols2:
+ return [', '.join([self._format_value_add(*x) for x in self.cols2.items()])]
+
+ out = []
+ if self.key:
+ for colname in self.key:
+ out.extend([highlight(1, self.COLORS['*']), colname, ': ', highlight(0), self.key[colname], ', '])
+
+ items = []
+ for i in range(len(self.cols1)):
+ items.append((
+ list(self.cols1.keys())[i],
+ list(self.cols1.values())[i],
+ list(self.cols2.values())[i]))
+ out.extend([', '.join([self._format_value_change(*x) for x in items])])
+
+ return out
+
+ def _format_value_del(self, k, v):
+ fs = (highlight(1, self.COLORS['-']) + '{}: ' + highlight(0) + '{}')
+ return fs.format(k, v)
+
+ def _format_value_add(self, k, v):
+ fs = (highlight(1, self.COLORS['+']) + '{}: ' + highlight(0) +
+ highlight(1, self.COLORS['V']) + '{}' + highlight(0))
+ return fs.format(k, v)
+
+ def _format_value_change(self, k, v1, v2):
+ fs = (highlight(1, self.COLORS['*']) + '{}: ' + highlight(0) +
+ '{} â–¶ ' +
+ highlight(1, self.COLORS['V']) + '{}' + highlight(0))
+ return fs.format(k, v1, v2)
+
+ def _format_insert(self, table):
+ out = ['INSERT INTO ', table, ' (']
+ out.append(', '.join(self.cols2.keys()))
+ out.append(') VALUES (')
+ out.append(', '.join(self.cols2.values()))
+ out.append(');')
+ return ''.join(out)
+
+ def _format_delete(self, table):
+ out = ['DELETE FROM ', table]
+ out.extend(self._format_where())
+ return ''.join(out)
+
+ def _format_update(self, table):
+ out = ['UPDATE ', table, ' SET ']
+ out.append(', '.join([self._format_set(*x) for x in self.cols2.items()]))
+ out.extend(self._format_where())
+ return ''.join(out)
+
+ def _format_set(self, k, v):
+ return '{} = {}'.format(k, v)
+
+ def _format_where(self):
+ out = [' WHERE ']
+ for colname in self.key:
+ out.extend([colname, ' = ', self.key[colname], ' AND '])
+ out[-1] = ';'
+ return out
+
+class PgDataDiff:
+ def __init__(self, conn1, conn2):
+ self.allowcolor = False
+ self.conn1 = conn1
+ self.conn2 = conn2
+ self.fulltable1 = None
+ self.fulltable2 = None
+
+ def settable1(self, table, schema='public'):
+ self.schema1 = schema
+ self.table1 = table
+ self.fulltable1 = '"' + schema + '"."'+ table + '"'
+
+ def settable2(self, table, schema='public'):
+ self.schema2 = schema
+ self.table2 = table
+ self.fulltable2 = '"' + schema + '"."'+ table + '"'
+
+ def iter_diff(self):
+ """Return differencies between data of two tables.
+
+ Yields one line at the time.
+
+ """
+ curs1, curs2 = self._select()
+
+ row1 = curs1.fetchone_dict()
+ row2 = curs2.fetchone_dict()
+
+ while True:
+ if row1 is None and row2 is None:
+ break
+ diff = self._compare_row(row1, row2, curs1.adapt, curs2.adapt)
+
+ if diff:
+ yield diff
+
+ if diff.change == '-':
+ row1 = curs1.fetchone_dict()
+ continue
+ if diff.change == '+':
+ row2 = curs2.fetchone_dict()
+ continue
+ # change == '*' or not diff
+ row1 = curs1.fetchone_dict()
+ row2 = curs2.fetchone_dict()
+
+ curs1.close()
+ curs2.close()
+
+ def print_diff(self, file=sys.stdout):
+ """Print differencies between data of two tables.
+
+ The output is in human readable form.
+
+ Set allowcolor=True of PgDataDiff instance to get colored output.
+
+ """
+ for ln in self.iter_diff():
+ print(ln.format(), file=file)
+
+ def print_patch(self, file=sys.stdout):
+ """Print SQL script usable as patch for destination table.
+
+ Supports INSERT, DELETE and UPDATE operations.
+
+ """
+ for ln in self.iter_diff():
+ print(ln.format_patch(self.fulltable1), file=file)
+
+ def _select(self):
+ browser = pgbrowser.PgBrowser(self.conn1)
+
+ columns = browser.list_columns(schema=self.schema1, table=self.table1, order=1)
+ if not columns:
+ raise Exception('Table %s.%s not found.' % (self.schema1, self.table1))
+ columns_sel = ', '.join(['"' + x['name'] + '"' for x in columns])
+ self.colnames = [x['name'] for x in columns]
+
+ pkey = [ind for ind in browser.list_indexes(schema=self.schema1, table=self.table1) if ind['primary']]
+ if not pkey:
+ raise Exception('Table %s.%s has no primary key.' % (self.schema1, self.table1))
+ pkey = pkey[0]
+ pkey_sel = ', '.join(['"' + x + '"' for x in pkey['columns']])
+ self.pkeycolnames = pkey['columns']
+
+ query1 = 'SELECT ' + columns_sel + ' FROM ' + self.fulltable1 + ' ORDER BY ' + pkey_sel
+ query2 = 'SELECT ' + columns_sel + ' FROM ' + self.fulltable2 + ' ORDER BY ' + pkey_sel
+
+ curs1 = self.conn1.cursor('curs1')
+ curs2 = self.conn2.cursor('curs2')
+
+ curs1.execute(query1)
+ curs2.execute(query2)
+
+ return curs1, curs2
+
+ def _compare_data(self, row1, row2):
+ cols1 = OrderedDict()
+ cols2 = OrderedDict()
+ for name in row1.keys():
+ if row1[name] != row2[name]:
+ cols1[name] = row1[name]
+ cols2[name] = row2[name]
+ if cols1:
+ key = OrderedDict(zip(self.pkeycolnames, [row1[colname] for colname in self.pkeycolnames]))
+ return DiffData('*', cols1, cols2, key=key)
+
+ return None
+
+ def _compare_row(self, row1, row2, adapt1, adapt2):
+ if row2 is None:
+ row1 = adapt1(row1)
+ key = OrderedDict(zip(self.pkeycolnames, [row1[colname] for colname in self.pkeycolnames]))
+ return DiffData('-', row1, None, key=key)
+ if row1 is None:
+ row2 = adapt2(row2)
+ return DiffData('+', None, row2)
+
+ for keyname in self.pkeycolnames:
+ if row1[keyname] < row2[keyname]:
+ row1 = adapt1(row1)
+ key = OrderedDict(zip(self.pkeycolnames, [row1[colname] for colname in self.pkeycolnames]))
+ return DiffData('-', row1, None, key=key)
+ for keyname in self.pkeycolnames:
+ if row1[keyname] > row2[keyname]:
+ row2 = adapt2(row2)
+ return DiffData('+', None, row2)
+
+ row1 = adapt1(row1)
+ row2 = adapt2(row2)
+ return self._compare_data(row1, row2)
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/pgdiff.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,635 @@
+# -*- coding: utf-8 -*-
+#
+# PgDiff - capture differences of database metadata
+#
+# Depends on PgBrowser
+#
+# Copyright (c) 2011 Radek Brich <radek.brich@devl.cz>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+
+from pycolib.ansicolor import *
+
+import re
+import difflib
+
+
+class PgDiffError(Exception):
+ pass
+
+
+class DiffBase:
+ COLORS = {
+ '+' : BOLD | GREEN,
+ '-' : BOLD | RED,
+ '*' : BOLD | YELLOW,
+ }
+
+ COMMANDS = {
+ '+' : 'CREATE',
+ '-' : 'DROP',
+ '*' : 'ALTER',
+ }
+
+ def __init__(self):
+ self.changes = None
+
+ def format(self):
+ out = [' ' * self.level]
+
+ out.append(highlight(1, self.COLORS[self.change]))
+ out.append(self.change)
+
+ out += [' ', self.type, ' ', self.name, highlight(0)]
+
+ if self.changes:
+ out += [highlight(1, WHITE), ' (', self._formatchanges(), ')', highlight(0)]
+
+ return ''.join(out)
+
+ def _formatnotnull(self, notnull):
+ if notnull:
+ return 'NOT NULL'
+ else:
+ return None
+
+ def _formatchanges(self):
+ res = []
+ for type, a, b in self.changes:
+ if type == 'notnull':
+ type = ''
+ a = self._formatnotnull(a)
+ b = self._formatnotnull(b)
+
+ if a and b:
+ s = ''.join(['Changed ', type, ' from ',
+ highlight(1,15), a, highlight(0), ' to ',
+ highlight(1,15), b, highlight(0), '.'])
+ elif a and not b:
+ l = ['Removed ']
+ if type:
+ l += [type, ' ']
+ l += [highlight(1,15), a, highlight(0), '.']
+ s = ''.join(l)
+ elif b and not a:
+ l = ['Added ']
+ if type:
+ l += [type, ' ']
+ l += [highlight(1,15), b, highlight(0), '.']
+ s = ''.join(l)
+ res.append(s)
+ return ' '.join(res)
+
+ def format_patch(self):
+ if self.change == '*' and self.type in ('schema', 'table'):
+ return None
+ return ['%s %s %s;' % (self.COMMANDS[self.change], self.type.upper(), self.name)]
+
+
+class DiffSchema(DiffBase):
+ def __init__(self, change, schema):
+ DiffBase.__init__(self)
+ self.level = 0
+ self.type = 'schema'
+ self.change = change
+ self.schema = schema
+ self.name = schema
+
+
+class DiffTable(DiffBase):
+ def __init__(self, change, schema, table):
+ DiffBase.__init__(self)
+ self.level = 1
+ self.type = 'table'
+ self.change = change
+ self.schema = schema
+ self.table = table
+ self.name = table
+
+
+class DiffArgument(DiffBase):
+ def __init__(self, change, schema, function, argument):
+ DiffBase.__init__(self)
+ self.level = 2
+ self.type = 'argument'
+ self.change = change
+ self.schema = schema
+ self.function = function
+ self.argument = argument
+ self.name = argument
+
+
+class DiffFunction(DiffBase):
+ def __init__(self, change, schema, function, definition, show_body_diff=False):
+ DiffBase.__init__(self)
+ self.level = 1
+ self.type = 'function'
+ self.change = change
+ self.schema = schema
+ self.function = function
+ #: New function definition
+ self.definition = definition
+ self.name = function
+ self.show_body_diff = show_body_diff
+
+ def _formatchanges(self):
+ res = []
+ for x in self.changes:
+ type, a, b = x
+ if type == 'source':
+ if self.show_body_diff:
+ lines = ['Source differs:\n']
+ for line in difflib.unified_diff(a, b, lineterm=''):
+ if line[:3] in ('---', '+++'):
+ continue
+ color = {' ': WHITE, '-': YELLOW, '+': GREEN, '@': WHITE|BOLD}[line[0]]
+ lines.append(highlight(1, color) + line + highlight(0) + '\n')
+ res.append(''.join(lines))
+ else:
+ res.append('Source differs.')
+ else:
+ res.append(''.join(['Changed ', type, ' from ',
+ highlight(1,15), a, highlight(0), ' to ',
+ highlight(1,15), b, highlight(0), '.']))
+ return ' '.join(res)
+
+ def format_patch(self):
+ return [self.definition]
+
+
+class DiffColumn(DiffBase):
+ ALTER_COMMANDS = {
+ '+' : 'ADD',
+ '-' : 'DROP',
+ '*' : 'ALTER',
+ }
+
+ def __init__(self, change, schema, table, column, columntype, columndefault, columnnotnull, changes=None):
+ DiffBase.__init__(self)
+ self.level = 2
+ self.type = 'column'
+ self.change = change
+ self.schema = schema
+ self.table = table
+ self.column = column
+ self.columntype = columntype
+ self.columndefault = columndefault
+ self.columnnotnull = columnnotnull
+ self.name = column
+ self.changes = changes
+
+ def format_patch(self):
+ alter_table = 'ALTER TABLE %s.%s %s COLUMN %s' % (
+ self.schema,
+ self.table,
+ self.ALTER_COMMANDS[self.change],
+ self.name,
+ )
+ out = []
+ if self.change == '-':
+ out.append('%s;' % alter_table);
+ if self.change == '+':
+ notnull = ''
+ if self.columnnotnull:
+ notnull = ' NOT NULL'
+ default = ''
+ if self.columndefault:
+ default = ' DEFAULT %s' % self.columndefault
+ out.append('%s %s%s%s;'
+ % (alter_table, self.columntype, notnull, default));
+ if self.change == '*':
+ for type, a, b in self.changes:
+ if type == 'type':
+ out.append('%s TYPE %s;' % (alter_table, b))
+ if type == 'notnull':
+ if a and not b:
+ out.append('%s DROP NOT NULL;' % alter_table)
+ if not a and b:
+ out.append('%s SET NOT NULL;' % alter_table)
+ if type == 'default':
+ if b:
+ out.append('%s SET DEFAULT %s;' % (alter_table, b))
+ else:
+ out.append('%s DROP DEFAULT;' % alter_table)
+ return out
+
+
+class DiffConstraint(DiffBase):
+ def __init__(self, change, schema, table, constraint, definition, changes=None):
+ DiffBase.__init__(self)
+ self.level = 2
+ self.type = 'constraint'
+ self.change = change
+ self.schema = schema
+ self.table = table
+ self.constraint = constraint
+ self.name = constraint
+ self.definition = definition
+ self.changes = changes
+
+ def format_patch(self):
+ q_alter = 'ALTER TABLE %s.%s' % (self.schema, self.table)
+ q_drop = '%s DROP CONSTRAINT %s;' % (q_alter, self.constraint)
+ q_add = '%s ADD CONSTRAINT %s %s;' % (q_alter, self.constraint, self.definition)
+ if self.change == '*':
+ out = [q_drop, q_add]
+ if self.change == '+':
+ out = [q_add]
+ if self.change == '-':
+ out = [q_drop]
+ return out
+
+
+class DiffIndex(DiffBase):
+ def __init__(self, change, schema, table, index, definition, changes=None):
+ DiffBase.__init__(self)
+ self.level = 2
+ self.type = 'index'
+ self.change = change
+ self.schema = schema
+ self.table = table
+ self.index = index
+ self.name = index
+ self.definition = definition
+ self.changes = changes
+
+ def format_patch(self):
+ q_drop = 'DROP INDEX %s;' % (self.index,)
+ q_add = '%s;' % (self.definition,)
+ if self.change == '*':
+ out = [q_drop, q_add]
+ if self.change == '+':
+ out = [q_add]
+ if self.change == '-':
+ out = [q_drop]
+ return out
+
+
+class DiffType(DiffBase):
+ def __init__(self, change, schema, name):
+ DiffBase.__init__(self)
+ self.level = 1
+ self.type = 'type'
+ self.change = change
+ self.schema = schema
+ self.name = name
+
+
+class PgDiff:
+ def __init__(self, srcbrowser=None, dstbrowser=None):
+ self.allowcolor = False
+ self.src = srcbrowser
+ self.dst = dstbrowser
+ self.include_schemas = set() # if not empty, consider only these schemas for diff
+ self.exclude_schemas = set() # exclude these schemas from diff
+ self.include_tables = set()
+ self.exclude_tables = set()
+ self.function_regex = re.compile(r"")
+ self.function_body_diff = False
+
+ def _test_schema(self, schema):
+ if self.include_schemas and schema not in self.include_schemas:
+ return False
+ if schema in self.exclude_schemas:
+ return False
+ return True
+
+ def _test_table(self, table):
+ if self.include_tables and table not in self.include_tables:
+ return False
+ if table in self.exclude_tables:
+ return False
+ return True
+
+ def _test_function(self, function):
+ return bool(self.function_regex.match(function))
+
+ def _diff_names(self, src, dst):
+ for x in src:
+ if x in dst:
+ yield ('*', x)
+ else:
+ yield ('-', x)
+ for x in dst:
+ if x not in src:
+ yield ('+', x)
+
+ def _compare_columns(self, a, b):
+ diff = []
+ if a.type != b.type:
+ diff.append(('type', a.type, b.type))
+ if a.notnull != b.notnull:
+ diff.append(('notnull', a.notnull, b.notnull))
+ if a.default != b.default:
+ diff.append(('default', a.default, b.default))
+ return diff
+
+ def _compare_constraints(self, a, b):
+ diff = []
+ if a.type != b.type:
+ diff.append(('type', a.type, b.type))
+ if a.definition != b.definition:
+ diff.append(('definition', a.definition, b.definition))
+ return diff
+
+ def _compare_indexes(self, a, b):
+ diff = []
+ if a.definition != b.definition:
+ diff.append(('definition', a.definition, b.definition))
+ return diff
+
+ def _compare_functions(self, a, b):
+ diff = []
+ if a.result != b.result:
+ diff.append(('result', a.result, b.result))
+ # function source may differ in newlines (\n vs \r\n)
+ # split lines before comparison, so that these differencies are ignored
+ a_source = a.source.splitlines()
+ b_source = b.source.splitlines()
+ if a_source != b_source:
+ diff.append(('source', a_source, b_source))
+ return diff
+
+ def _compare_arguments(self, a, b):
+ diff = []
+ if a.type != b.type:
+ diff.append(('type', a.type, b.type))
+ if a.mode != b.mode:
+ diff.append(('mode', a.mode, b.mode))
+ if a.default != b.default:
+ diff.append(('default', a.default, b.default))
+ return diff
+
+ def _compare_types(self, a, b):
+ diff = []
+ if a.type != b.type:
+ diff.append(('type', a.type, b.type))
+ if a.elements != b.elements:
+ diff.append(('elements', repr(a.elements), repr(b.elements)))
+ return diff
+
+ def _diff_columns(self, schema, table, src_columns, dst_columns):
+ for nd in self._diff_names(src_columns, dst_columns):
+ if nd[1] in dst_columns:
+ dst_type = dst_columns[nd[1]].type
+ dst_default = dst_columns[nd[1]].default
+ dst_notnull = dst_columns[nd[1]].notnull
+ else:
+ dst_type = None
+ dst_default = None
+ dst_notnull = None
+ cdo = DiffColumn(change=nd[0], schema=schema, table=table, column=nd[1],
+ columntype=dst_type, columndefault=dst_default, columnnotnull=dst_notnull)
+ if nd[0] == '*':
+ a = src_columns[nd[1]]
+ b = dst_columns[nd[1]]
+ cdo.changes = self._compare_columns(a, b)
+ if cdo.changes:
+ yield cdo
+ else:
+ yield cdo
+
+ def _diff_constraints(self, schema, table, src_constraints, dst_constraints):
+ for nd in self._diff_names(src_constraints, dst_constraints):
+ if nd[1] in dst_constraints:
+ dst_definition = dst_constraints[nd[1]].definition
+ else:
+ dst_definition = None
+ cdo = DiffConstraint(change=nd[0], schema=schema, table=table, constraint=nd[1],
+ definition=dst_definition)
+ if nd[0] == '*':
+ a = src_constraints[nd[1]]
+ b = dst_constraints[nd[1]]
+ cdo.changes = self._compare_constraints(a, b)
+ if cdo.changes:
+ yield cdo
+ else:
+ yield cdo
+
+ def _diff_indexes(self, schema, table, src_indexes, dst_indexes):
+ for nd in self._diff_names(src_indexes, dst_indexes):
+ if nd[1] in dst_indexes:
+ dst_definition = dst_indexes[nd[1]].definition
+ else:
+ dst_definition = None
+ ido = DiffIndex(change=nd[0], schema=schema, table=table, index=nd[1],
+ definition=dst_definition)
+ if nd[0] == '*':
+ a = src_indexes[nd[1]]
+ b = dst_indexes[nd[1]]
+ ido.changes = self._compare_indexes(a, b)
+ if ido.changes:
+ yield ido
+ else:
+ yield ido
+
+ def _diff_tables(self, schema, src_tables, dst_tables):
+ for nd in self._diff_names(src_tables, dst_tables):
+ if not self._test_table(nd[1]):
+ continue
+ tdo = DiffTable(change=nd[0], schema=schema, table=nd[1])
+ if nd[0] == '*':
+ # columns
+ src_columns = src_tables[nd[1]].columns
+ dst_columns = dst_tables[nd[1]].columns
+ for cdo in self._diff_columns(schema, nd[1], src_columns, dst_columns):
+ if tdo:
+ yield tdo
+ tdo = None
+ yield cdo
+ # constraints
+ src_constraints = src_tables[nd[1]].constraints
+ dst_constraints = dst_tables[nd[1]].constraints
+ for cdo in self._diff_constraints(schema, nd[1], src_constraints, dst_constraints):
+ if tdo:
+ yield tdo
+ tdo = None
+ yield cdo
+ # indexes
+ src_indexes = src_tables[nd[1]].indexes
+ dst_indexes = dst_tables[nd[1]].indexes
+ for ido in self._diff_indexes(schema, nd[1], src_indexes, dst_indexes):
+ if tdo:
+ yield tdo
+ tdo = None
+ yield ido
+ else:
+ yield tdo
+
+ def _diff_arguments(self, schema, function, src_args, dst_args):
+ for nd in self._diff_names(src_args, dst_args):
+ ado = DiffArgument(change=nd[0], schema=schema, function=function, argument=nd[1])
+ if nd[0] == '*':
+ a = src_args[nd[1]]
+ b = dst_args[nd[1]]
+ ado.changes = self._compare_arguments(a, b)
+ if ado.changes:
+ yield ado
+ else:
+ yield ado
+
+ def _diff_functions(self, schema, src_functions, dst_functions):
+ for nd in self._diff_names(src_functions, dst_functions):
+ if not self._test_function(nd[1]):
+ continue
+ if nd[1] in dst_functions:
+ dst_definition = dst_functions[nd[1]].definition
+ else:
+ dst_definition = None
+ fdo = DiffFunction(change=nd[0], schema=schema, function=nd[1],
+ definition=dst_definition,
+ show_body_diff=self.function_body_diff)
+ if nd[0] == '*':
+ # compare function body and result
+ a = src_functions[nd[1]]
+ b = dst_functions[nd[1]]
+ fdo.changes = self._compare_functions(a, b)
+ if fdo.changes:
+ yield fdo
+ fdo = None
+ # arguments
+ src_args = src_functions[nd[1]].arguments
+ dst_args = dst_functions[nd[1]].arguments
+ for ado in self._diff_arguments(schema, nd[1], src_args, dst_args):
+ if fdo:
+ yield fdo
+ fdo = None
+ yield ado
+ else:
+ yield fdo
+
+ def _diff_types(self, schema, src_types, dst_types):
+ for nd in self._diff_names(src_types, dst_types):
+ tdo = DiffType(change=nd[0], schema=schema, name=nd[1])
+ if nd[0] == '*':
+ a = src_types[nd[1]]
+ b = dst_types[nd[1]]
+ tdo.changes = self._compare_types(a, b)
+ if tdo.changes:
+ yield tdo
+ else:
+ yield tdo
+
+ def iter_diff(self):
+ '''Return diff between src and dst database schema.
+
+ Yields one line at the time. Each line is in form of object
+ iherited from DiffBase. This object contains all information
+ about changes. See format() method.
+
+ '''
+ src_schemas = self.src.schemas
+ dst_schemas = self.dst.schemas
+ src = [x.name for x in src_schemas.values() if not x.system and self._test_schema(x.name)]
+ dst = [x.name for x in dst_schemas.values() if not x.system and self._test_schema(x.name)]
+ for nd in self._diff_names(src, dst):
+ sdo = DiffSchema(change=nd[0], schema=nd[1])
+ if nd[0] == '*':
+ # tables
+ src_tables = src_schemas[nd[1]].tables
+ dst_tables = dst_schemas[nd[1]].tables
+ for tdo in self._diff_tables(nd[1], src_tables, dst_tables):
+ if sdo:
+ yield sdo
+ sdo = None
+ yield tdo
+ # functions
+ src_functions = src_schemas[nd[1]].functions
+ dst_functions = dst_schemas[nd[1]].functions
+ for fdo in self._diff_functions(nd[1], src_functions, dst_functions):
+ if sdo:
+ yield sdo
+ sdo = None
+ yield fdo
+ # types
+ src_types = src_schemas[nd[1]].types
+ dst_types = dst_schemas[nd[1]].types
+ for tdo in self._diff_types(nd[1], src_types, dst_types):
+ if sdo:
+ yield sdo
+ sdo = None
+ yield tdo
+ else:
+ yield sdo
+
+ def print_diff(self):
+ '''Print diff between src and dst database schema.
+
+ The output is in human readable form.
+
+ Set allowcolor=True of PgDiff instance to get colored output.
+
+ '''
+ for ln in self.iter_diff():
+ print(ln.format())
+
+ def print_patch(self):
+ '''Print patch for updating from src schema to dst schema.
+
+ Supports table drop, add, column drop, add and following
+ changes of columns:
+ - type
+ - set/remove not null
+ - default value
+
+ This is experimental, not tested very much.
+ Do not use without checking the commands.
+ Even if it works as intended, it can cause table lock ups
+ and/or loss of data. You have been warned.
+
+ '''
+ for ln in self.iter_diff():
+ patch = ln.format_patch()
+ if patch:
+ print('\n'.join(patch))
+
+ def filter_schemas(self, include=[], exclude=[]):
+ '''Modify list of schemas which are used for computing diff.
+
+ include (list) -- if not empty, consider only these schemas for diff
+ exclude (list) -- exclude these schemas from diff
+
+ Order: include, exclude
+ include=[] means include everything
+
+ Raises:
+ PgDiffError: when schema from include list is not found in src db
+
+ '''
+ for schema in include:
+ self._check_schema_exist(schema)
+ self.include_schemas.clear()
+ self.include_schemas.update(include)
+ self.exclude_schemas.clear()
+ self.exclude_schemas.update(exclude)
+
+ def filter_tables(self, include=[], exclude=[]):
+ self.include_tables.clear()
+ self.include_tables.update(include)
+ self.exclude_tables.clear()
+ self.exclude_tables.update(exclude)
+
+ def filter_functions(self, regex=''):
+ self.function_regex = re.compile(regex)
+
+ def _check_schema_exist(self, schema):
+ if not schema in self.src.schemas:
+ raise PgDiffError('Schema "%s" not found in source database.' % schema)
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/pgmanager.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,515 @@
+# -*- coding: utf-8 -*-
+#
+# PgManager - manage database connections
+#
+# Requires: Python 3.2, psycopg2
+#
+# Part of pydbkit
+# http://hg.devl.cz/pydbkit
+#
+# Copyright (c) 2010, 2011, 2012, 2013 Radek Brich <radek.brich@devl.cz>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+"""Postgres database connection manager
+
+PgManager wraps psycopg2, adding following features:
+
+ * Save and reuse database connection parameters
+
+ * Connection pooling
+
+ * Easy query using the with statement
+
+ * Dictionary rows
+
+Example usage:
+
+ from pydbkit import pgmanager
+
+ pgm = pgmanager.get_instance()
+ pgm.create_conn(hostaddr='127.0.0.1', dbname='postgres')
+
+ with pgm.cursor() as curs:
+ curs.execute('SELECT now() AS now')
+ row = curs.fetchone_dict()
+ print(row.now)
+
+First, we have obtained PgManager instance. This is like calling
+PgManager(), although in our example the instance is global. That means
+getting the instance in another module brings us all the defined connections
+etc.
+
+On second line we have created connection named 'default' (this name can be left out).
+The with statement obtains connection (actually connects to database when needed),
+then returns cursor for this connection. At the end of with statement,
+the connection is returned to the pool or closed (depending on number of connections
+in pool and on setting of pool_size parameter).
+
+The row returned by fetchone_dict() is special dict object, which can be accessed
+using item or attribute access, that is row['now'] or row.now.
+
+"""
+
+from contextlib import contextmanager
+from collections import OrderedDict
+import logging
+import threading
+import multiprocessing
+import select
+import socket
+
+import psycopg2
+import psycopg2.extensions
+
+from psycopg2 import DatabaseError, IntegrityError, OperationalError
+
+
+log_sql = logging.getLogger("pgmanager_sql")
+log_notices = logging.getLogger("pgmanager_notices")
+log_sql.addHandler(logging.NullHandler())
+# NullHandler not needed for notices which are INFO level only
+
+
+class PgManagerError(Exception):
+
+ pass
+
+
+class ConnectionInfo:
+
+ def __init__(self, name, dsn, isolation_level=None, keep_alive=True,
+ init_statement=None, pool_size=1):
+ self.name = name # connection name is logged with SQL queries
+ self.dsn = dsn # dsn or string with connection parameters
+ self.isolation_level = isolation_level
+ self.keep_alive = keep_alive
+ self.init_statement = init_statement
+ self.pool_size = pool_size
+
+
+class RowDict(OrderedDict):
+ """Special dictionary used for rows returned from queries.
+
+ Items keep order in which columns where returned from database.
+
+ It supports three styles of access:
+
+ Dict style:
+ row['id']
+ for key in row:
+ ...
+
+ Object style (only works if column name does not collide with any method name):
+ row.id
+
+ Tuple style:
+ row[0]
+ id, name = row.values()
+
+ """
+
+ def __getitem__(self, key):
+ if isinstance(key, int):
+ return tuple(self.values())[key]
+ else:
+ return OrderedDict.__getitem__(self, key)
+
+ def __getattr__(self, key):
+ try:
+ return self[key]
+ except KeyError:
+ raise AttributeError(key)
+
+
+class Cursor(psycopg2.extensions.cursor):
+
+ def execute(self, query, args=None):
+ # log query before executing
+ self._log_query(query, args)
+ try:
+ return super(Cursor, self).execute(query, args)
+ except DatabaseError:
+ self._log_exception()
+ raise
+
+ def callproc(self, procname, args=None):
+ # log query before executing (not query actually executed but should correspond)
+ self._log_query(self._build_callproc_query(procname, len(args)), args)
+ try:
+ return super(Cursor, self).callproc(procname, args)
+ except DatabaseError:
+ self._log_exception()
+ raise
+
+ def row_dict(self, row, lstrip=None):
+ adjustname = lambda a: a
+ if lstrip:
+ adjustname = lambda a: a.lstrip(lstrip)
+ return RowDict(zip([adjustname(desc[0]) for desc in self.description], row))
+
+ def fetchone_dict(self, lstrip=None):
+ '''Return one row as OrderedDict'''
+ row = super(Cursor, self).fetchone()
+ if row is None:
+ return None
+ return self.row_dict(row, lstrip)
+
+ def fetchall_dict(self, lstrip=None):
+ '''Return all rows as OrderedDict'''
+ rows = super(Cursor, self).fetchall()
+ return [self.row_dict(row, lstrip) for row in rows]
+
+ def adapt(self, row):
+ if isinstance(row, RowDict):
+ # dict
+ adapted = dict()
+ for key in row.keys():
+ adapted[key] = self.mogrify('%s', [row[key]]).decode('utf8')
+ return RowDict(adapted)
+ else:
+ # list
+ return [self.mogrify('%s', [x]).decode('utf8') for x in row]
+
+ def fetchone_adapted(self, lstrip=None):
+ '''Like fetchone_dict() but values are quoted for direct inclusion in SQL query.
+
+ This is useful when you need to generate SQL script from data returned
+ by the query. Use mogrify() for simple cases.
+
+ '''
+ row = super(Cursor, self).fetchone()
+ if row is None:
+ return None
+ return self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip)
+
+ def fetchall_adapted(self, lstrip=None):
+ '''Like fetchall_dict() but values are quoted for direct inclusion in SQL query.'''
+ rows = super(Cursor, self).fetchall()
+ return [self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip) for row in rows]
+
+ def _log_query(self, query='?', args=None):
+ name = self.connection.name if hasattr(self.connection, 'name') else '-'
+ query = self.mogrify(query, args)
+ log_sql.debug('[%s] %s' % (name, query.decode('utf8')))
+
+ def _log_exception(self):
+ name = self.connection.name if hasattr(self.connection, 'name') else '-'
+ log_sql.exception('[%s] exception:' % (name,))
+
+ def _build_callproc_query(self, procname, num_args):
+ return 'SELECT * FROM %s(%s)' % (procname, ', '.join(['%s'] * num_args))
+
+
+class Connection(psycopg2.extensions.connection):
+
+ def cursor(self, name=None):
+ if name is None:
+ return super(Connection, self).cursor(cursor_factory=Cursor)
+ else:
+ return super(Connection, self).cursor(name, cursor_factory=Cursor)
+
+ def keep_alive(self):
+ '''Set socket to keepalive mode. Must be called before any query.'''
+ sock = socket.fromfd(self.fileno(), socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+ try:
+ # Maximum keep-alive probes before asuming the connection is lost
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
+ # Interval (in seconds) between keep-alive probes
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 2)
+ # Maximum idle time (in seconds) before start sending keep-alive probes
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10)
+ except socket.error:
+ pass
+ # close duplicated fd, options set for socket stays
+ sock.close()
+
+
+class PgManager:
+
+ def __init__(self):
+ self.conn_known = {} # available connections
+ self.conn_pool = {} # active connetions
+ self.lock = threading.Lock() # mutual exclusion for threads
+ self.pid = multiprocessing.current_process().pid # forking check
+
+ def __del__(self):
+ for conn in tuple(self.conn_known.keys()):
+ self.destroy_conn(conn)
+
+ def create_conn(self, name='default', isolation_level=None, keep_alive=True, init_statement=None,
+ pool_size=1, dsn=None, **kwargs):
+ '''Create named connection.
+
+ *name* -- name for connection
+
+ *pool_size* -- how many connections will be kept open in pool.
+ More connections will still be created but they will be closed by put_conn.
+ `None` will disable pool, get_conn() will then always return same connection.
+
+ *isolation_level* -- `"autocommit"`, `"read_committed"`, `"serializable"` or `None` for driver default
+
+ *keep_alive* -- set socket to keepalive mode
+
+ *dsn* -- connection string (parameters or data source name)
+
+ Other keyword args are used as connection parameters.
+
+ '''
+ if name in self.conn_known:
+ raise PgManagerError('Connection name "%s" already registered.' % name)
+
+ if dsn is None:
+ dsn = ' '.join([x[0]+'='+str(x[1]) for x in kwargs.items() if x[1] is not None])
+
+ isolation_level = self._normalize_isolation_level(isolation_level)
+ ci = ConnectionInfo(name, dsn, isolation_level, keep_alive, init_statement, pool_size)
+
+ self.conn_known[name] = ci
+ self.conn_pool[name] = []
+
+ def create_conn_listen(self, name, channel, dsn=None, copy_dsn=None, **kwargs):
+ '''Create connection listening for notifies.
+
+ Disables pool. If you want to use pool, create other connection for that.
+ This connection can be used as usual: conn.cursor() etc.
+ Don't use PgManager's cursor() and put_conn().
+
+ *name* -- name for connection
+
+ *channel* -- listen on this channel
+
+ *copy_dsn* -- specify name of other connection and its dsn will be used
+
+ Other parameters forwarded to create_conn().
+
+ '''
+ if dsn is None and copy_dsn:
+ try:
+ dsn = self.conn_known[copy_dsn].dsn
+ except KeyError:
+ raise PgManagerError("Connection name '%s' not registered." % copy_dsn)
+ listen_query = "LISTEN " + channel
+ self.create_conn(name=name, pool_size=None, isolation_level='autocommit', init_statement=listen_query,
+ dsn=dsn, **kwargs)
+
+ def close_conn(self, name='default'):
+ '''Close all connections of given name.
+
+ Connection credentials are still saved.
+
+ '''
+ while len(self.conn_pool[name]):
+ conn = self.conn_pool[name].pop()
+ conn.close()
+
+ def destroy_conn(self, name='default'):
+ '''Destroy connection.
+
+ Counterpart of create_conn.
+
+ '''
+ if not name in self.conn_known:
+ raise PgManagerError('Connection name "%s" not registered.' % name)
+
+ self.close_conn(name)
+
+ del self.conn_known[name]
+ del self.conn_pool[name]
+
+ def knows_conn(self, name='default'):
+ return name in self.conn_known
+
+ def get_conn(self, name='default'):
+ '''Get connection of name 'name' from pool.'''
+ self._check_fork()
+ self.lock.acquire()
+ try:
+ try:
+ ci = self.conn_known[name]
+ except KeyError:
+ raise PgManagerError("Connection name '%s' not registered." % name)
+
+ # no pool, just one static connection
+ if ci.pool_size is None:
+ # check for existing connection
+ try:
+ conn = self.conn_pool[name][0]
+ if conn.closed:
+ conn = None
+ except IndexError:
+ conn = None
+ self.conn_pool[name].append(conn)
+ # if no existing connection is valid, connect new one and save it
+ if conn is None:
+ conn = self._connect(ci)
+ self.conn_pool[name][0] = conn
+
+ # connection from pool
+ else:
+ conn = None
+ while len(self.conn_pool[name]) and conn is None:
+ conn = self.conn_pool[name].pop()
+ if conn.closed:
+ conn = None
+
+ if conn is None:
+ conn = self._connect(ci)
+ finally:
+ self.lock.release()
+ return conn
+
+ def put_conn(self, conn, name='default'):
+ '''Put connection back to pool.
+
+ *name* must be same as used for get_conn, otherwise things become broken.
+
+ '''
+ self.lock.acquire()
+ try:
+ if not name in self.conn_known:
+ raise PgManagerError("Connection name '%s' not registered." % name)
+
+ if len(self.conn_pool[name]) >= self.conn_known[name].pool_size:
+ conn.close()
+ return
+
+ if conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN:
+ conn.close()
+ return
+
+ # connection returned to the pool must not be in transaction
+ if conn.get_transaction_status() != psycopg2.extensions.TRANSACTION_STATUS_IDLE:
+ try:
+ conn.rollback()
+ except OperationalError:
+ if not conn.closed:
+ conn.close()
+ return
+
+ self.conn_pool[name].append(conn)
+ finally:
+ self.lock.release()
+
+ @contextmanager
+ def cursor(self, name='default'):
+ '''Cursor context.
+
+ Uses any connection info with *name* from pool
+ and returns cursor for that connection.
+
+ '''
+ conn = self.get_conn(name)
+
+ try:
+ curs = conn.cursor()
+ yield curs
+ finally:
+ curs.close()
+ self.log_notices(conn)
+ self.put_conn(conn, name)
+
+ def log_notices(self, conn):
+ for notice in conn.notices:
+ log_notices.info(notice.rstrip())
+ conn.notices[:] = []
+
+ def wait_for_notify(self, name='default', timeout=None):
+ '''Wait for asynchronous notifies, return the last one.
+
+ *name* -- name of connection, must be created using `create_conn_listen()`
+
+ *timeout* -- in seconds, floating point (`None` means wait forever)
+
+ Returns `None` on timeout.
+
+ '''
+ conn = self.get_conn(name)
+
+ # return any notifies on stack
+ if conn.notifies:
+ return conn.notifies.pop()
+
+ if select.select([conn], [], [], timeout) == ([], [], []):
+ # timeout
+ return None
+ else:
+ conn.poll()
+
+ # return just the last notify (we do not care for older ones)
+ if conn.notifies:
+ return conn.notifies.pop()
+ return None
+
+ def _connect(self, ci):
+ conn = psycopg2.connect(ci.dsn, connection_factory=Connection)
+ conn.name = ci.name
+ if ci.keep_alive:
+ conn.keep_alive()
+ if not ci.isolation_level is None:
+ conn.set_isolation_level(ci.isolation_level)
+ if ci.init_statement:
+ curs = conn.cursor()
+ curs.execute(ci.init_statement)
+ curs.connection.commit()
+ curs.close()
+ return conn
+
+ def _normalize_isolation_level(self, level):
+ if type(level) == str:
+ if level.lower() == 'autocommit':
+ return psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
+ if level.lower() == 'read_committed':
+ return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED
+ if level.lower() == 'serializable':
+ return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
+ raise PgManagerError('Unknown isolation level name: "%s"' % level)
+ return level
+
+ def _check_fork(self):
+ '''Check if process was forked (PID has changed).
+
+ If it was, clean parent's connections.
+ New connections are created for children.
+ Known connection credentials are inherited, but not shared.
+
+ '''
+ if self.pid == multiprocessing.current_process().pid:
+ # PID has not changed
+ return
+
+ # update saved PID
+ self.pid = multiprocessing.current_process().pid
+ # reinitialize lock
+ self.lock = threading.Lock()
+ # clean parent's connections
+ for name in self.conn_pool:
+ self.conn_pool[name] = []
+
+ @classmethod
+ def get_instance(cls):
+ if not hasattr(cls, '_instance'):
+ cls._instance = cls()
+ return cls._instance
+
+
+def get_instance():
+ return PgManager.get_instance()
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/pgstats.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,51 @@
+# -*- coding: utf-8 -*-
+#
+# PgStats - browse database statistics
+#
+# Copyright (c) 2011 Radek Brich <radek.brich@devl.cz>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+
+class PgStats:
+ def __init__(self, conn=None):
+ self.conn = conn
+
+ def setconn(self, conn=None):
+ self.conn = conn
+
+ def _query(self, query, *args):
+ try:
+ curs = self.conn.cursor()
+ curs.execute(query, args)
+ curs.connection.commit()
+ rows = curs.fetchall()
+ return [dict(zip([desc[0] for desc in curs.description], row)) for row in rows]
+ finally:
+ curs.close()
+
+ def list_long_queries(self, longer_than='1m'):
+ return self._query('''SELECT
+ datname, procpid, usename, current_query AS query,
+ waiting, xact_start, query_start, backend_start, client_addr
+ FROM pg_stat_activity
+ WHERE current_query <> '<IDLE>' AND query_start < now() - interval %s
+ ORDER BY query_start;''',
+ longer_than)
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/progresswrapper.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,54 @@
+
+import sys
+
+
+class ProgressWrapper:
+
+ def __init__(self, f, size=0):
+ self.f = f
+ self.size = size
+ self.readsize = 0
+ self.cycles = 0
+ self.print_cycles = 200
+
+ def humanize(self, bytes):
+ if bytes > 1024**3:
+ return '%.1fG' % (bytes / 1024.**3)
+ if bytes > 1024**2:
+ return '%.1fM' % (bytes / 1024.**2)
+ if bytes > 1024:
+ return '%.1fk' % (bytes / 1024.)
+ return '%d' % bytes
+
+ def write(self, data):
+ self.size += len(data)
+ if self.cycles == 0:
+ print(' read %s \r' % self.humanize(self.size), end='')
+ sys.stdout.flush()
+ self.cycles = self.print_cycles * 200
+ else:
+ self.cycles -= 1
+ return self.f.write(data)
+
+ def read(self, size):
+ self.readsize += size
+ if self.cycles == 0:
+ if self.size > 0:
+ percent = self.readsize * 100. / self.size
+ else:
+ percent = 100
+ if percent > 100:
+ percent = 100
+ print(' written %s / %s (%.1f%%) \r' % (
+ self.humanize(self.readsize),
+ self.humanize(self.size),
+ percent), end='')
+ sys.stdout.flush()
+ self.cycles = self.print_cycles
+ else:
+ self.cycles -= 1
+ return self.f.read(size)
+
+ def close(self):
+ self.f.close()
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/toolbase.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,261 @@
+from pydbkit import pgmanager, pgbrowser
+
+from pycolib.configparser import ConfigParser
+from pycolib.coloredformatter import ColoredFormatter
+from pycolib.ansicolor import highlight
+
+import argparse
+import logging
+import re
+import textwrap
+
+
+class ConnectionInfoNotFound(Exception):
+ pass
+
+
+class BadArgsError(Exception):
+ pass
+
+
+class ToolDescriptionFormatter(argparse.HelpFormatter):
+ """Help message formatter which retains any formatting in descriptions."""
+
+ def _fill_text(self, text, width, indent):
+ return textwrap.dedent(text)
+
+
+class ToolBase:
+
+ def __init__(self, name, desc=None, **kwargs):
+ self.config = ConfigParser()
+ self.parser = argparse.ArgumentParser(prog=name, description=desc or self.__doc__,
+ formatter_class=ToolDescriptionFormatter)
+ self.pgm = pgmanager.get_instance()
+ self.target_isolation_level = None
+
+ def setup(self, args=None):
+ self.specify_args()
+ self.load_args(args)
+ self.init_logging()
+
+ def specify_args(self):
+ self.config.add_option('databases', dict)
+ self.config.add_option('meta_db')
+ self.config.add_option('meta_query')
+ self.parser.add_argument('-Q', dest='show_queries', action='store_true',
+ help='Print database queries.')
+ self.parser.add_argument('-C', dest='config_file', type=str,
+ help='Additional config file (besides pydbkit.conf).')
+
+ def load_args(self, args=None, config_file=None):
+ # Parse command line arguments
+ self.args = self.parser.parse_args(args)
+ # Load global config
+ self.config.load('/etc/pydbkit.conf', must_exist=False)
+ # Load local config
+ self.config.load(config_file or 'pydbkit.conf', must_exist=False)
+ # Load additional config
+ if self.args.config_file:
+ self.config.load(self.args.config_file)
+
+ def init_logging(self):
+ # logging
+ format = ColoredFormatter(highlight(1,7,0)+'%(asctime)s %(levelname)-5s'+highlight(0)+' %(message)s', '%H:%M:%S')
+ handler = logging.StreamHandler()
+ handler.setFormatter(format)
+ handler.setLevel(logging.DEBUG)
+ self.log = logging.getLogger('main')
+ self.log.addHandler(handler)
+ self.log.setLevel(logging.DEBUG)
+
+ log_notices = logging.getLogger('pgmanager_notices')
+ log_notices.addHandler(handler)
+ log_notices.setLevel(logging.DEBUG)
+
+ if self.args.show_queries:
+ log_sql = logging.getLogger('pgmanager_sql')
+ log_sql.addHandler(handler)
+ log_sql.setLevel(logging.DEBUG)
+
+ def prepare_conn_from_metadb(self, name, lookup_name):
+ """Create connection in pgmanager using meta DB.
+
+ name -- Name for connection in pgmanager.
+ lookup_name -- Name of connection in meta DB.
+
+ """
+ if not self.pgm.knows_conn('meta'):
+ self.pgm.create_conn(name='meta', dsn=self.config.meta_db)
+ with self.pgm.cursor('meta') as curs:
+ curs.execute(self.config.meta_query, [lookup_name])
+ row = curs.fetchone_dict()
+ curs.connection.commit()
+ if row:
+ self.pgm.create_conn(name=name,
+ isolation_level=self.target_isolation_level,
+ **row)
+ return True
+ self.pgm.close_conn('meta')
+
+ def prepare_conn_from_config(self, name, lookup_name):
+ """Create connection in pgmanager using info in config.databases."""
+ if self.config.databases:
+ if lookup_name in self.config.databases:
+ dsn = self.config.databases[lookup_name]
+ self.pgm.create_conn(name=name,
+ isolation_level=self.target_isolation_level,
+ dsn=dsn)
+ return True
+
+ def prepare_conns(self, **kwargs):
+ """Create connections in PgManager.
+
+ Keyword arguments meaning:
+ key: connection name for use in PgManager
+ value: connection name in config or meta DB
+
+ """
+ for name in kwargs:
+ lookup_name = kwargs[name]
+ found = self.prepare_conn_from_config(name, lookup_name)
+ if not found and self.config.meta_db:
+ found = self.prepare_conn_from_metadb(name, lookup_name)
+ if not found:
+ raise ConnectionInfoNotFound('Connection name "%s" not found in config nor in meta DB.' % lookup_name)
+
+
+class SimpleTool(ToolBase):
+
+ def __init__(self, name, desc=None, **kwargs):
+ ToolBase.__init__(self, name, desc, **kwargs)
+
+ def specify_args(self):
+ ToolBase.specify_args(self)
+ self.config.add_option('target', type=str, default=None)
+ self.parser.add_argument('target', nargs='?', type=str, help='Target database')
+
+ def load_args(self, args=None, config_file=None):
+ ToolBase.load_args(self, args, config_file)
+ self.target = self.args.target or self.config.target or 'default'
+
+ def setup(self, args=None):
+ ToolBase.setup(self, args)
+ self.prepare_conns(target=self.target)
+
+
+class SrcDstTool(ToolBase):
+
+ def __init__(self, name, desc=None, *, allow_reverse=False, force_reverse=False, **kwargs):
+ ToolBase.__init__(self, name, desc, **kwargs)
+ self.allow_reverse = allow_reverse
+ self.force_reverse = force_reverse
+
+ def specify_args(self):
+ ToolBase.specify_args(self)
+ self.parser.add_argument('src', metavar='source', type=str, help='Source database')
+ self.parser.add_argument('dst', metavar='destination', type=str, help='Destination database')
+ if self.allow_reverse:
+ self.parser.add_argument('-r', '--reverse', action='store_true', help='Reverse operation. Swap source and destination.')
+
+ def load_args(self, args=None, config_file=None):
+ ToolBase.load_args(self, args, config_file)
+ if self.is_reversed():
+ self.args.src, self.args.dst = self.args.dst, self.args.src
+
+ def setup(self, args=None):
+ ToolBase.setup(self, args)
+ self.prepare_conns(src=self.args.src, dst=self.args.dst)
+
+ def is_reversed(self):
+ return ('reverse' in self.args and self.args.reverse) or self.force_reverse
+
+
+class SrcDstTablesTool(SrcDstTool):
+
+ def specify_args(self):
+ SrcDstTool.specify_args(self)
+ self.parser.add_argument('-t', '--src-table', metavar='source_table',
+ dest='srctable', type=str, default='', help='Source table name.')
+ self.parser.add_argument('-s', '--src-schema', metavar='source_schema',
+ dest='srcschema', type=str, default='', help='Source schema name (default=public).')
+ self.parser.add_argument('--dst-table', metavar='destination_table',
+ dest='dsttable', type=str, default='', help='Destination table name (default=source_table).')
+ self.parser.add_argument('--dst-schema', metavar='destination_schema',
+ dest='dstschema', type=str, default='', help='Destination schema name (default=source_schema).')
+ self.parser.add_argument('--regex', action='store_true', help="Use RE in schema or table name.")
+
+ def load_args(self, args=None, config_file=None):
+ SrcDstTool.load_args(self, args, config_file)
+ self.load_table_names()
+
+ def load_table_names(self):
+ self.schema1 = self.args.srcschema
+ self.table1 = self.args.srctable
+ self.schema2 = self.args.dstschema
+ self.table2 = self.args.dsttable
+
+ # check regex - it applies to source name, dest name must not be specified
+ # applies to only one - schema or table name
+ if self.args.regex:
+ if self.table2 or (self.schema2 and not self.table1):
+ raise BadArgsError('Cannot specify both --regex and --dst-schema, --dst-table.')
+ # schema defaults to public
+ if self.table1 and not self.schema1:
+ self.schema1 = 'public'
+ # dest defaults to source
+ if not self.schema2:
+ self.schema2 = self.schema1
+ if not self.table2:
+ self.table2 = self.table1
+
+ # swap src, dst when in reverse mode
+ if self.is_reversed():
+ self.schema1, self.schema2 = self.schema2, self.schema1
+ self.table1, self.table2 = self.table2, self.table1
+
+ def tables(self):
+ '''Generator. Yields schema1, table1, schema2, table2.'''
+ srcconn = self.pgm.get_conn('src')
+ try:
+ srcbrowser = pgbrowser.PgBrowser(srcconn)
+ if self.args.regex:
+ if not self.table1:
+ # all tables from schemas defined by regex
+ for item in self._iter_schemas_regex(srcbrowser, self.schema1):
+ yield item
+ else:
+ # all tables defined by regex
+ for item in self._iter_tables_regex(srcbrowser, self.schema1, self.schema2, self.table1):
+ yield item
+ else:
+ if not self.table1:
+ if not self.schema1:
+ # all tables from all schemas
+ for item in self._iter_schemas_regex(srcbrowser, self.schema1):
+ yield item
+ else:
+ # all tables from specified schema
+ for item in self._iter_tables_regex(srcbrowser, self.schema1, self.schema2, self.table1):
+ yield item
+ else:
+ # one table
+ yield (self.schema1, self.table1, self.schema2, self.table2)
+ finally:
+ self.pgm.put_conn(srcconn, 'src')
+
+ def _iter_schemas_regex(self, browser, regex):
+ for schema in browser.list_schemas():
+ if schema['system']:
+ continue
+ schemaname = schema['name']
+ if re.match(regex, schemaname):
+ for item in self._iter_tables_regex(browser, schemaname, schemaname, ''):
+ yield item
+
+ def _iter_tables_regex(self, browser, schema1, schema2, regex):
+ for table in browser.list_tables(schema1):
+ tablename = table['name']
+ if re.match(regex, tablename):
+ yield (schema1, tablename, schema2, tablename)
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/tools/__init__.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,3 @@
+__all__ = ['analyzeall', 'batchcopy', 'bigtables', 'listdepends',
+ 'listserial', 'longqueries', 'loopquery',
+ 'runquery', 'schemadiff', 'tablediff', 'tablesync']
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/tools/analyzeall.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,52 @@
+from pydbkit.toolbase import SimpleTool
+from pydbkit import pgbrowser
+
+
+class AnalyzeAllTool(SimpleTool):
+
+ """
+ Analyze/vacuum all tables in selected schemas.
+
+ Partially emulates VACUUM ANALYZE VERBOSE query.
+ But this program is more configurable, skips pg_catalog etc.
+
+ """
+
+ def __init__(self):
+ SimpleTool.__init__(self, name='analyzeall')
+ self.target_isolation_level = 'autocommit'
+
+ def specify_args(self):
+ SimpleTool.specify_args(self)
+ self.parser.add_argument('-s', dest='schema', nargs='*', help='Schema filter')
+ self.parser.add_argument('--vacuum', action='store_true', help='Call VACUUM ANALYZE')
+ self.parser.add_argument('--vacuum-full', action='store_true', help='Call VACUUM FULL ANALYZE')
+ self.parser.add_argument('--reindex', action='store_true', help='Call REINDEX TABLE')
+
+ def main(self):
+ browser = pgbrowser.PgBrowser(self.pgm.get_conn('target'))
+
+ query_patterns = ['ANALYZE %s.%s;']
+ if self.args.vacuum:
+ query_patterns = ['VACUUM ANALYZE %s.%s;']
+ if self.args.vacuum_full:
+ query_patterns = ['VACUUM FULL ANALYZE %s.%s;']
+ if self.args.reindex:
+ query_patterns += ['REINDEX TABLE %s.%s;']
+
+ schema_list = self.args.schema
+ if not schema_list:
+ schema_list = [schema['name'] for schema in browser.list_schemas() if not schema['system']]
+
+ for schema in schema_list:
+ tables = browser.list_tables(schema=schema)
+ with self.pgm.cursor('target') as curs:
+ for table in tables:
+ for query_pattern in query_patterns:
+ query = query_pattern % (schema, table['name'])
+ self.log.info(query)
+ curs.execute(query, [])
+
+
+cls = AnalyzeAllTool
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/tools/batchcopy.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,61 @@
+from pydbkit.toolbase import SrcDstTablesTool
+from pydbkit.pgmanager import IntegrityError
+
+
+class BatchCopyTool(SrcDstTablesTool):
+
+ """
+ Copy data from one table to another, filtering by specified condition.
+
+ """
+
+ def __init__(self):
+ SrcDstTablesTool.__init__(self, name='batchcopy', desc='')
+
+ def specify_args(self):
+ SrcDstTablesTool.specify_args(self)
+ self.parser.add_argument('--table-name', type=str, help='Table to be copied.')
+ self.parser.add_argument('--src-filter', type=str, help='WHERE condition for source query.')
+ self.parser.add_argument('--file-with-ids', type=str, help='Read source IDs from file (each ID on new line). Use these in --src-filter as {ids}')
+ self.parser.add_argument('--dst-exists', choices=['rollback', 'ignore', 'update'], default='rollback', help='What to do when destination record already exists.')
+
+ def main(self):
+ # read list of IDs from file
+ ids = '<no IDs read>'
+ if self.args.file_with_ids:
+ with open(self.args.file_with_ids, 'r') as f:
+ ids = ','.join(ln.rstrip() for ln in f.readlines())
+
+ # read source data
+ with self.pgm.cursor('src') as src_curs:
+ condition = self.args.src_filter.format(ids=ids) or 'true'
+ src_curs.execute('SELECT * FROM {} WHERE {}'.format(self.args.table_name, condition))
+ #TODO: ORDER BY id OFFSET 0 LIMIT 100
+ data = src_curs.fetchall_dict()
+ src_curs.connection.commit()
+
+ with self.pgm.cursor('dst') as dst_curs:
+ copied = 0
+ for row in data:
+ keys = ', '.join(row.keys())
+ values_mask = ', '.join(['%s'] * len(row))
+ query = 'INSERT INTO {} ({}) VALUES ({})'.format(self.args.table_name, keys, values_mask)
+ try:
+ dst_curs.execute('SAVEPOINT the_query;')
+ dst_curs.execute(query, list(row.values()))
+ dst_curs.execute('RELEASE SAVEPOINT the_query;')
+ copied += 1
+ except IntegrityError:
+ if self.args.dst_exists == 'rollback':
+ dst_curs.connection.rollback()
+ break
+ elif self.args.dst_exists == 'ignore':
+ dst_curs.execute('ROLLBACK TO SAVEPOINT the_query;')
+ elif self.args.dst_exists == 'update':
+ raise NotImplementedError()
+ dst_curs.connection.commit()
+
+ self.log.info('Copied %s rows.', copied)
+
+
+cls = BatchCopyTool
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/tools/bigtables.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,55 @@
+from pydbkit.toolbase import SimpleTool
+from pydbkit import pgbrowser
+from pycolib.prettysize import prettysize_short
+from pycolib.ansicolor import highlight
+
+
+class BigTablesTool(SimpleTool):
+
+ """List largest tables.
+
+ Reads size statistics of tables and indexes from pgcatalog.
+
+ """
+
+ def __init__(self):
+ SimpleTool.__init__(self, name='bigtables')
+
+ def specify_args(self):
+ SimpleTool.specify_args(self)
+ self.parser.add_argument('-n', '--limit', metavar='NUM', dest='limit', type=int, default=5, help='Show NUM biggest tables.')
+ self.parser.add_argument('-v', '--details', dest='details', action='store_true', help='Show sizes of data and individual indexes.')
+
+ def main(self):
+ browser = pgbrowser.PgBrowser(self.pgm.get_conn('target'))
+
+ # scan all tables from all shemas, remember names and sizes
+ all_tables = []
+ all_indexes = []
+ schemas = browser.list_schemas()
+ for schema in schemas:
+ tables = browser.list_tables(schema['name'])
+ for table in tables:
+ table_name = '%s.%s' % (schema['name'], table['name'])
+ indexes = browser.list_indexes(table['name'], schema['name'])
+ for index in indexes:
+ all_indexes.append({'name': index['name'], 'table': table_name, 'size': index['size']})
+ size_with_indexes = table['size'] + sum(index['size'] for index in indexes)
+ all_tables.append({'name': table_name, 'size': table['size'], 'indexes': indexes, 'size_with_indexes': size_with_indexes})
+
+ # print names and sizes of 20 largest tables
+ for table in sorted(all_tables, reverse=True, key=lambda x: x['size_with_indexes'])[:self.args.limit]:
+ print(highlight(1) + prettysize_short(table['size_with_indexes'], trailing_zeros=True).rjust(8) + highlight(0),
+ '(total)'.ljust(8),
+ highlight(1) + table['name'] + highlight(0), sep=' ')
+ if self.args.details:
+ print(prettysize_short(table['size'], trailing_zeros=True).rjust(8),
+ '(data)'.ljust(8), sep=' ')
+ for index in sorted(table['indexes'], reverse=True, key=lambda x: x['size']):
+ print(prettysize_short(index['size'], trailing_zeros=True).rjust(8),
+ '(index)'.ljust(8), index['name'], sep=' ')
+ print()
+
+
+cls = BigTablesTool
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/tools/listdepends.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,31 @@
+from pydbkit.toolbase import SimpleTool
+from pydbkit import pgbrowser
+
+
+class ListDependsTool(SimpleTool):
+
+ """
+ List column dependencies.
+
+ """
+
+ def __init__(self):
+ SimpleTool.__init__(self, name='listdepends')
+
+ def specify_args(self):
+ SimpleTool.specify_args(self)
+ self.parser.add_argument('table', metavar='table', type=str, help='Table name.')
+ self.parser.add_argument('column', metavar='column', type=str, help='Column name.')
+ self.parser.add_argument('-s', '--schema', dest='schema', metavar='schema',
+ type=str, default='public', help='Schema name (default=public).')
+
+ def main(self):
+ browser = pgbrowser.PgBrowser(self.pgm.get_conn('target'))
+
+ objects = browser.list_column_usage(self.args.table, self.args.column, schema=self.args.schema)
+ for obj in sorted(objects, key=lambda x: (x['type'], x['schema'], x['name'])):
+ print(obj['type'], ' ', obj['schema'], '.', obj['name'], sep='')
+
+
+cls = ListDependsTool
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/tools/listserial.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,69 @@
+from pydbkit.toolbase import SimpleTool
+from pydbkit import pgbrowser
+from pycolib.ansicolor import highlight, WHITE, YELLOW, RED, BOLD
+
+
+class ListSerialTool(SimpleTool):
+
+ """List sequences near to overflow.
+
+ Checks all sequences attached to a column of type integer.
+
+ Highlight dangerous values of sequence:
+ * Yellow - near overflow (90%)
+ * Red - already over...
+
+ Does not list sequences with value under 50% of range.
+
+ """
+
+ max_int = 2147483647
+
+ def __init__(self):
+ SimpleTool.__init__(self, name='listserial')
+
+ def main(self):
+ conn = self.pgm.get_conn('target')
+ browser = pgbrowser.PgBrowser(conn)
+ rows = browser.list_sequences()
+ sequences = []
+ for row in rows:
+ if row['related_column_type'] == 'integer':
+ # read sequence attributes like last_value
+ q = 'SELECT * FROM "%s"."%s"' % (row['sequence_schema'], row['sequence_name'])
+ curs = conn.cursor()
+ curs.execute(q)
+ attrs = curs.fetchone_dict()
+ # skip this sequence if its cycled and has safe max_value
+ if attrs['is_cycled'] and attrs['max_value'] <= self.max_int:
+ continue
+ # skip sequences with last_value not yet in half of max_int
+ if attrs['last_value'] < self.max_int / 2:
+ continue
+ # remember rest of sequences
+ row['attrs'] = attrs
+ sequences.append(row)
+ # sort most dangerous on top
+ sequences.sort(key=lambda x: x['attrs']['last_value'], reverse=True)
+ # print out what we've found
+ for seq in sequences:
+ print('Sequence:', seq['sequence_schema'] + '.' + seq['sequence_name'])
+ print(' Related:', seq['sequence_schema'] + '.' + seq['related_table'], seq['related_column'], '(' + seq['related_column_type'] + ')')
+ print(' integer max', '2147483647')
+ # colorize last value
+ last_val = seq['attrs']['last_value']
+ col = WHITE + BOLD
+ if last_val > self.max_int * 0.9:
+ # near max
+ col = YELLOW + BOLD
+ if last_val > self.max_int:
+ # over max
+ col = RED + BOLD
+ print(' last_value', highlight(1, col) + str(last_val) + highlight(0))
+ for key in ('min_value', 'max_value', 'is_cycled'):
+ print(' ', key, seq['attrs'][key])
+ print()
+
+
+cls = ListSerialTool
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/tools/longqueries.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,34 @@
+from pydbkit.toolbase import SimpleTool
+from pydbkit import pgstats
+from pycolib.ansicolor import highlight, YELLOW, BOLD
+
+
+class LongQueriesTool(SimpleTool):
+
+ """
+ List long running queries.
+ """
+
+ def __init__(self):
+ SimpleTool.__init__(self, name='longqueries')
+
+ def specify_args(self):
+ SimpleTool.specify_args(self)
+ self.parser.add_argument('--age', default='1m', help='How long must be the query running to be listed.')
+
+ def main(self):
+ stats = pgstats.PgStats(self.pgm.get_conn('target'))
+
+ for ln in stats.list_long_queries(self.args.age):
+ print(highlight(1),
+ 'backend PID: ', ln['procpid'],
+ ', query_start: ', ln['query_start'].strftime('%F %T'),
+ ', client IP: ', ln['client_addr'],
+ ln['waiting'] and ', ' + highlight(1, YELLOW|BOLD) + 'waiting' or '',
+ highlight(0), sep='')
+ print(ln['query'])
+ print()
+
+
+cls = LongQueriesTool
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/tools/loopquery.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,92 @@
+from pydbkit.toolbase import SimpleTool
+
+import logging.handlers
+import time
+from datetime import datetime, timedelta
+
+
+class LoopQueryTool(SimpleTool):
+
+ """
+ Execute queries in loop, with configurable interval.
+
+ """
+
+ def __init__(self):
+ SimpleTool.__init__(self, name='loopquery')
+ self.target_isolation_level = 'autocommit'
+
+ def specify_args(self):
+ SimpleTool.specify_args(self)
+ self.parser.add_argument('-q', dest='queries', metavar='QUERY', nargs='*', help='Queries to run.')
+ self.parser.add_argument('--mins', dest='delay_mins', type=int, help='Delay between queries in minutes.')
+ self.parser.add_argument('--secs', dest='delay_secs', type=int, help='Delay between queries in seconds.')
+
+ self.config.add_option('queries', type=list, default=[])
+ self.config.add_option('delay_mins', type=int, default=0)
+ self.config.add_option('delay_secs', type=int, default=0)
+ self.config.add_option('log_path', type=str)
+
+ def load_args(self, args=None, config_file=None):
+ SimpleTool.load_args(self, args, config_file)
+ self.queries = self.args.queries or self.config.queries
+ self.delay_mins = self.args.delay_mins or self.config.delay_mins
+ self.delay_secs = self.args.delay_secs or self.config.delay_secs
+
+ def init_logging(self):
+ SimpleTool.init_logging(self)
+ if self.config.log_path:
+ self.init_file_logs(self.config.log_path)
+
+ def init_file_logs(self, path):
+ format = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s', '%y-%m-%d %H:%M:%S')
+ handler = logging.handlers.TimedRotatingFileHandler(path+'/main.log', when='midnight', backupCount=5)
+ handler.setFormatter(format)
+ handler.setLevel(logging.DEBUG)
+ logging.getLogger('main').addHandler(handler)
+
+ format = logging.Formatter('%(asctime)s %(message)s', '%y-%m-%d %H:%M:%S')
+ handler = logging.handlers.TimedRotatingFileHandler(path+'/pgnotices.log', when='midnight', backupCount=5)
+ handler.setFormatter(format)
+ handler.setLevel(logging.DEBUG)
+ logging.getLogger('pgmanager_notices').addHandler(handler)
+
+ def main(self):
+ self.reset()
+ while True:
+ self.wait()
+ self.action()
+
+ def reset(self):
+ """Check current time, set next action time."""
+ dt = datetime.today()
+ dt = dt.replace(microsecond = 0)
+ self.next_action_time = dt + timedelta(minutes = self.delay_mins,
+ seconds = self.delay_secs)
+
+ def wait(self):
+ """Wait for action time, compute next action time."""
+ now = datetime.today()
+ self.log.debug('Next run %s', self.next_action_time)
+ if self.next_action_time > now:
+ td = self.next_action_time - now
+ self.log.debug('Sleep %ds', td.seconds + td.microseconds/1e6)
+ time.sleep(td.seconds + td.microseconds/1e6)
+ self.next_action_time += timedelta(minutes = self.delay_mins,
+ seconds = self.delay_secs)
+ # in case that action took too long and next planned time would
+ # be in past -> reset planner
+ if self.next_action_time < now:
+ self.reset()
+
+ def action(self):
+ """Execute the queries."""
+ for q in self.queries:
+ self.log.info('%s', q)
+ with self.pgm.cursor('target') as curs:
+ curs.execute(q)
+ self.log.info('Done')
+
+
+cls = LoopQueryTool
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/tools/runquery.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,96 @@
+from pydbkit.toolbase import SimpleTool
+
+import logging.handlers
+import time
+from datetime import datetime, timedelta
+from psycopg2 import ProgrammingError
+
+
+class RunQueryTool(ToolBase):
+
+ """
+ Execute configured queries in target database.
+ """
+
+ def __init__(self):
+ SimpleTool.__init__(self, name='runquery')
+ self.target_isolation_level = 'autocommit'
+
+ def specify_args(self):
+ SimpleTool.specify_args(self)
+ self.parser.add_argument('-q', dest='queries', metavar='QUERY', nargs='*', help='Queries to run.')
+ self.parser.add_argument('-f', dest='file', metavar='FILE', help='Read query from file.')
+ self.parser.add_argument('--one-query-per-line', action='store_true', help='When reading queries from file, consider each line as separate query.')
+ self.parser.add_argument('-p', '--parameter', dest='parameters', metavar='PARAM=VALUE', nargs='*',
+ help="If query should be used as format template, these parameters will be substituted.")
+ self.parser.add_argument('--output-file', dest='output_file', metavar='OUTPUT_FILE', help='Write query result in specified file.')
+ self.parser.add_argument('--format', dest='format', metavar='FORMAT', help='Format string for each line in output file (using Python\'s format()).')
+
+ self.config.add_option('queries', type=list, default=[])
+ self.config.add_option('log_path', type=str)
+
+ def load_args(self, args=None, config_file=None):
+ SimpleTool.load_args(self, args, config_file)
+ self.queries = self.args.queries or self.config.queries
+ # read query from file
+ if self.args.file:
+ with open(self.args.file, 'r', encoding='utf8') as f:
+ data = f.read()
+ if self.args.one_query_per_line:
+ file_queries = [ln for ln in data.splitlines() if not ln.lstrip().startswith('--')]
+ self.queries = file_queries + self.queries
+ else:
+ self.queries.insert(0, data)
+ # prepare parameters
+ self._prepare_parameters(self.args.parameters)
+
+ def init_logging(self):
+ ToolBase.init_logging(self)
+ if self.config.log_path:
+ self.init_file_logs(self.config.log_path)
+
+ def init_file_logs(self, path):
+ format = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s', '%y-%m-%d %H:%M:%S')
+ handler = logging.handlers.TimedRotatingFileHandler(path+'/main.log', when='midnight', backupCount=5)
+ handler.setFormatter(format)
+ handler.setLevel(logging.DEBUG)
+ logging.getLogger('main').addHandler(handler)
+
+ format = logging.Formatter('%(asctime)s %(message)s', '%y-%m-%d %H:%M:%S')
+ handler = logging.handlers.TimedRotatingFileHandler(path+'/pgnotices.log', when='midnight', backupCount=5)
+ handler.setFormatter(format)
+ handler.setLevel(logging.DEBUG)
+ logging.getLogger('pgmanager_notices').addHandler(handler)
+
+ def main(self):
+ """Execute the queries."""
+ for q in self.queries:
+ if self.parameters:
+ q = q.format(**self.parameters)
+ self.log.info('%s', q if len(q) < 100 else q[:100]+'...')
+ with self.pgm.cursor('target') as curs:
+ curs.execute(q)
+ self.log.info('Rows affected: %d', curs.rowcount)
+ try:
+ rows = curs.fetchall_dict()
+ self._write_output_file(rows)
+ except ProgrammingError:
+ pass
+ self.log.info('Done')
+
+ def _write_output_file(self, rows):
+ if not self.args.output_file:
+ return
+ with open(self.args.output_file, 'w', encoding='utf8') as f:
+ for row in rows:
+ print(self.args.format.format(row), file=f)
+
+ def _prepare_parameters(self, parameters):
+ self.parameters = {}
+ for parameter in parameters or ():
+ name, value = parameter.split('=', 1)
+ self.parameters[name] = value
+
+
+cls = RunQueryTool
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/tools/schemadiff.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,51 @@
+from pydbkit.toolbase import SrcDstTool
+from pydbkit import pgmanager, pgbrowser, pgdiff, toolbase
+
+
+class SchemaDiffTool(SrcDstTool):
+
+ """
+ Print differences in database schema.
+
+ Prints changes from source to destination.
+ SQL patch updates source database schema to destination schema.
+
+ """
+
+ def __init__(self):
+ SrcDstTool.__init__(self, name='schemadiff', allow_reverse=True)
+
+ def specify_args(self):
+ SrcDstTool.specify_args(self)
+ self.parser.add_argument('-s', dest='schema', nargs='*', help='Schema filter')
+ self.parser.add_argument('-t', dest='table', nargs='*', help='Table filter')
+ self.parser.add_argument('-f', dest='function', type=str, help='Function filter (regex)')
+ self.parser.add_argument('--sql', action='store_true', help='Output is SQL script.')
+ self.parser.add_argument('--body', action='store_true', help='Output diff for function bodies.')
+
+ def main(self):
+ srcbrowser = pgbrowser.PgBrowser(self.pgm.get_conn('src'))
+ dstbrowser = pgbrowser.PgBrowser(self.pgm.get_conn('dst'))
+
+ pgd = pgdiff.PgDiff(srcbrowser, dstbrowser)
+
+ try:
+ if self.args.schema:
+ pgd.filter_schemas(include=self.args.schema)
+ if self.args.table:
+ pgd.filter_tables(include=self.args.table)
+ if self.args.function:
+ pgd.filter_functions(self.args.function)
+ if self.args.body:
+ pgd.function_body_diff = True
+
+ if self.args.sql:
+ pgd.print_patch()
+ else:
+ pgd.print_diff()
+ except pgdiff.PgDiffError as e:
+ print('PgDiff error:', str(e))
+
+
+cls = SchemaDiffTool
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/tools/tablediff.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,69 @@
+from pydbkit import toolbase, pgmanager, pgdatadiff
+from pydbkit.toolbase import SrcDstTablesTool
+from pycolib.ansicolor import highlight, BOLD, YELLOW
+
+import sys
+
+
+class TableDiffTool(SrcDstTablesTool):
+
+ """
+ Print differences between data in tables.
+
+ Requirements:
+ * Source table must have defined PRIMARY KEY.
+ * Destination table must contain all columns from source table.
+ Order is not important.
+
+ """
+
+ def __init__(self):
+ SrcDstTablesTool.__init__(self, name='tablediff', desc=self.__doc__, allow_reverse=True)
+
+ def specify_args(self):
+ SrcDstTablesTool.specify_args(self)
+ self.parser.add_argument('--sql', action='store_true', help='Output is SQL script.')
+ self.parser.add_argument('--rowcount', action='store_true', help='Compare number of rows.')
+ self.parser.add_argument('-o', '--output-file', help='Output file for sql queries.')
+
+ def main(self):
+ srcconn = self.pgm.get_conn('src')
+ dstconn = self.pgm.get_conn('dst')
+
+ if self.args.output_file:
+ output_file = open(self.args.output_file, 'w')
+ else:
+ output_file = sys.stdout
+
+ dd = pgdatadiff.PgDataDiff(srcconn, dstconn)
+
+ for srcschema, srctable, dstschema, dsttable in self.tables():
+ print('-- Diff from [%s] %s.%s to [%s] %s.%s' % (
+ self.args.src, srcschema, srctable,
+ self.args.dst, dstschema, dsttable),
+ file=output_file)
+
+ if self.args.rowcount:
+ with self.pgm.cursor('src') as curs:
+ curs.execute('''SELECT count(*) FROM "%s"."%s"''' % (srcschema, srctable))
+ srccount = curs.fetchone()[0]
+ with self.pgm.cursor('dst') as curs:
+ curs.execute('''SELECT count(*) FROM "%s"."%s"''' % (dstschema, dsttable))
+ dstcount = curs.fetchone()[0]
+ if srccount != dstcount:
+ print(highlight(1, BOLD | YELLOW),
+ "Row count differs: src=%s dst=%s" % (srccount, dstcount),
+ highlight(0), sep='', file=output_file)
+ continue
+
+ dd.settable1(srctable, srcschema)
+ dd.settable2(dsttable, dstschema)
+
+ if self.args.sql:
+ dd.print_patch(file=output_file)
+ else:
+ dd.print_diff(file=output_file)
+
+
+cls = TableDiffTool
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pydbkit/tools/tablesync.py Wed Jul 09 18:03:54 2014 +0200
@@ -0,0 +1,60 @@
+from pydbkit.toolbase import SrcDstTool
+from pydbkit.tools.tablediff import TableDiffTool
+from pydbkit.tools.runquery import RunQueryTool
+
+
+class TableSyncTool(SrcDstTool):
+
+ """
+ Synchronize tables between two databases (tablediff + runquery).
+
+ This will essentially call following commands on each table from list:
+ * pgtool tablediff <source> <target> -r -s <schema> -t <table> --sql -o /tmp/diff.sql
+ * pgtool runquery <target> -f /tmp/diff.sql
+
+ """
+
+ def __init__(self):
+ SrcDstTool.__init__(self, name='tablesync', force_reverse=True)
+ self.tablediff = TableDiffTool()
+ self.tablediff.specify_args()
+ self.runquery = RunQueryTool()
+ self.runquery.specify_args()
+
+ def specify_args(self):
+ SrcDstTool.specify_args(self)
+ self.parser.add_argument('-t', dest='tables', metavar='table', nargs='*',
+ help="Tables to be synchronized.")
+ self.parser.add_argument('-s', '--schema', metavar='default_schema',
+ dest='schema', type=str, default='public', help='Default schema name.')
+
+ def init_logging(self):
+ SrcDstTool.init_logging(self)
+ self.runquery.log = self.log
+
+ def setup(self, args=None):
+ SrcDstTool.setup(self, args)
+ self.target_isolation_level = 'autocommit'
+ self.prepare_conns(target=self.args.src)
+
+ def main(self):
+ for table in self.args.tables:
+ self.sync(table)
+
+ def sync(self, table):
+ if '.' in table:
+ schema, table = table.split('.', 1)
+ else:
+ schema = self.args.schema
+ # Call tablediff
+ self.tablediff.load_args([self.args.src, self.args.dst,
+ '-r', '-s', schema, '-t', table, '--sql', '-o', '/tmp/diff.sql'])
+ self.tablediff.main()
+ # Call runquery
+ self.runquery.load_args([self.args.src, '--one-query-per-line',
+ '-f', '/tmp/diff.sql'])
+ self.runquery.main()
+
+
+cls = TableSyncTool
+
--- a/setup.py Mon May 26 18:18:21 2014 +0200
+++ b/setup.py Wed Jul 09 18:03:54 2014 +0200
@@ -3,14 +3,14 @@
from distutils.core import setup
setup(
- name='pgtoolkit',
- version='0.4.1',
- description='Postgres utilities, build on top of psycopg2',
+ name='pydbkit',
+ version='0.4.2',
+ description='Database tools and libraries, focused on PostgreSQL.',
author='Radek Brich',
author_email='radek.brich@devl.cz',
- url='http://hg.devl.cz/pgtoolkit',
+ url='http://hg.devl.cz/pydbkit',
keywords=['postgresql', 'psycopg2', 'pool', 'keepalive'],
- packages=['pgtoolkit', 'pgtoolkit.tools', 'mytoolkit'],
+ packages=['pydbkit', 'pydbkit.tools'],
scripts=['pgtool'],
)
--- a/tablecopy.py Mon May 26 18:18:21 2014 +0200
+++ b/tablecopy.py Wed Jul 09 18:03:54 2014 +0200
@@ -11,8 +11,8 @@
import io
-from pgtoolkit import toolbase, pgmanager, pgbrowser, pgdatacopy
-from pgtoolkit.progresswrapper import ProgressWrapper
+from pydbkit import toolbase, pgmanager, pgbrowser, pgdatacopy
+from pydbkit.progresswrapper import ProgressWrapper
class TableCopyTool(toolbase.SrcDstTablesTool):
--- a/tests/delayedquery.py Mon May 26 18:18:21 2014 +0200
+++ b/tests/delayedquery.py Wed Jul 09 18:03:54 2014 +0200
@@ -4,8 +4,8 @@
import time
from tests.config import Config
-from pgtoolkit import pgmanager
-from pgtoolkit.delayedquery import DelayedQuery
+from pydbkit import pgmanager
+from pydbkit.delayedquery import DelayedQuery
if __name__ == '__main__':
--- a/tests/multiprocess.py Mon May 26 18:18:21 2014 +0200
+++ b/tests/multiprocess.py Wed Jul 09 18:03:54 2014 +0200
@@ -26,7 +26,7 @@
import multiprocessing
from tests.config import Config
-from pgtoolkit import pgmanager
+from pydbkit import pgmanager
def sub1(id):
--- a/tests/test_mymanager.py Mon May 26 18:18:21 2014 +0200
+++ b/tests/test_mymanager.py Wed Jul 09 18:03:54 2014 +0200
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
from tests.config import Config
-from mytoolkit import mymanager
+from pydbkit import mymanager
import unittest
import logging
@@ -10,7 +10,7 @@
class TestMyManager(unittest.TestCase):
def setUp(self):
- cfg = Config('pgtoolkit.conf')
+ cfg = Config('pydbkit.conf')
test_db_conn_params = cfg['databases']['test_mysql']
params = self.params_to_mapping(test_db_conn_params)
self.m = mymanager.get_instance()
--- a/tests/test_pgmanager.py Mon May 26 18:18:21 2014 +0200
+++ b/tests/test_pgmanager.py Wed Jul 09 18:03:54 2014 +0200
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
from tests.config import Config
-from pgtoolkit import pgmanager
+from pydbkit import pgmanager
import unittest
@@ -9,7 +9,7 @@
class TestSuite(unittest.TestCase):
def setUp(self):
- cfg = Config('pgtoolkit.conf')
+ cfg = Config('pydbkit.conf')
test_db_conn_params = cfg['databases']['test']
self.m = pgmanager.get_instance()
self.m.create_conn(dsn=test_db_conn_params)
--- a/tests/test_rowdict.py Mon May 26 18:18:21 2014 +0200
+++ b/tests/test_rowdict.py Wed Jul 09 18:03:54 2014 +0200
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
-from pgtoolkit.pgmanager import RowDict
+from pydbkit.pgmanager import RowDict
import unittest