# HG changeset patch # User Radek Brich # Date 1323969701 -3600 # Node ID 5664afa530e5b09794d76db183cbd4107d66c040 # Parent dc2dbe872fc8a445c833f7cb14e5a7db358df90e PgManager: Add partial support for multiprocessing. diff -r dc2dbe872fc8 -r 5664afa530e5 pgtoolkit/pgmanager.py --- a/pgtoolkit/pgmanager.py Wed Dec 14 16:29:33 2011 +0100 +++ b/pgtoolkit/pgmanager.py Thu Dec 15 18:21:41 2011 +0100 @@ -72,6 +72,7 @@ from contextlib import contextmanager import logging import threading +import multiprocessing import select import socket @@ -113,13 +114,15 @@ try: return super(Cursor, self).execute(query, args) finally: - log.debug(self.query.decode('utf8')) + if self.query: + log.debug(self.query.decode('utf8')) def callproc(self, procname, args=None): try: return super(Cursor, self).callproc(procname, args) finally: - log.debug(self.query.decode('utf8')) + if self.query: + log.debug(self.query.decode('utf8')) def row_dict(self, row, lstrip=None): adjustname = lambda a: a @@ -182,8 +185,9 @@ def __init__(self): self.conn_known = {} # available connections - self.conn_pool = {} - self.lock = threading.Lock() + self.conn_pool = {} # active connetions + self.lock = threading.Lock() # mutual exclusion for threads + self.pid = multiprocessing.current_process().pid # forking check def __del__(self): for conn in tuple(self.conn_known.keys()): @@ -240,6 +244,7 @@ def get_conn(self, name='default'): '''Get connection of name 'name' from pool.''' + self._check_fork() self.lock.acquire() try: if not name in self.conn_known: @@ -355,6 +360,26 @@ return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE raise PgManagerError('Unknown isolation level name: "%s"', level) return level + + def _check_fork(self): + '''Check if process was forked (PID has changed). + + If it was, clean parent's connections. + New connections are created for children. + Known connection credentials are inherited, but not shared. + + ''' + if self.pid == multiprocessing.current_process().pid: + # PID has not changed + return + + # update saved PID + self.pid = multiprocessing.current_process().pid + # reinitialize lock + self.lock = threading.Lock() + # clean parent's connections + for name in self.conn_pool: + self.conn_pool[name] = [] @classmethod def get_instance(cls): diff -r dc2dbe872fc8 -r 5664afa530e5 tests/multiprocessing.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/multiprocessing.py Thu Dec 15 18:21:41 2011 +0100 @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 +'''Multiprocessing test. + +PgManager is partially safe against process forking. + +You can create connection credentials (create_conn) in main process, then fork +and continue working with pgmanager in children processes. You can call create_conn again +in children, but new connection credentials will be accessible only from process +where it was created -- no sharing between processes. + +get_conn and cursor will still work after fork. Connections from parent process will +be forgotten, that cannot be used un child process. + +Connections cannot be shared between processes, DO NOT: + * get_conn, then fork, then put_conn + * get_conn, then fork, then use the connection, then .close() + +Good usage: + * create_conn, fork, get_conn, cursor, etc. + +Basically, you can transfer only information from create_conn(), nothing else. + +''' + + +import multiprocessing + +from config import Config +from pgtoolkit import pgmanager + + +def sub1(id): + with pgm.cursor() as curs: + print('[%d] pgconn: %d' % (multiprocessing.current_process().pid, curs.connection.get_backend_pid())) + print('[%d] update' % multiprocessing.current_process().pid) + curs.execute('''UPDATE test SET name = 'multi-1' WHERE id=%s''', [id]) + print('[%d] commit' % multiprocessing.current_process().pid) + curs.connection.commit() + +def sub2(id): + pgm = pgmanager.get_instance() + with pgm.cursor() as curs: + print('[%d] pgconn: %d' % (multiprocessing.current_process().pid, curs.connection.get_backend_pid())) + print('[%d] update' % multiprocessing.current_process().pid) + curs.execute('''UPDATE test SET name = 'multi-2' WHERE id=%s''', [id]) + print('[%d] commit' % multiprocessing.current_process().pid) + curs.connection.commit() + + +cfg = Config('tests.conf') +pgm = pgmanager.get_instance() +pgm.create_conn(**cfg) + +with pgm.cursor() as curs: + print('[%d] pgconn: %d' % (multiprocessing.current_process().pid, curs.connection.get_backend_pid())) + print('[%d] insert' % multiprocessing.current_process().pid) + curs.execute('''INSERT INTO test (name) VALUES ('multi') RETURNING id''') + id = curs.fetchone()[0] + curs.connection.commit() + + print('[%d] update' % multiprocessing.current_process().pid) + curs.execute('''UPDATE test SET name = 'multi-main' WHERE id=%s''', [id]) + # not committed + +p1 = multiprocessing.Process(target=sub1, args=[id]) +p1.start() +p2 = multiprocessing.Process(target=sub2, args=[id]) +p2.start() + +with pgm.cursor() as curs: + print('[%d] commit' % multiprocessing.current_process().pid) + curs.connection.commit() + +print('join') +p1.join() +p2.join() + +print('done')