PgManager: Add partial support for multiprocessing.
--- a/pgtoolkit/pgmanager.py Wed Dec 14 16:29:33 2011 +0100
+++ b/pgtoolkit/pgmanager.py Thu Dec 15 18:21:41 2011 +0100
@@ -72,6 +72,7 @@
from contextlib import contextmanager
import logging
import threading
+import multiprocessing
import select
import socket
@@ -113,13 +114,15 @@
try:
return super(Cursor, self).execute(query, args)
finally:
- log.debug(self.query.decode('utf8'))
+ if self.query:
+ log.debug(self.query.decode('utf8'))
def callproc(self, procname, args=None):
try:
return super(Cursor, self).callproc(procname, args)
finally:
- log.debug(self.query.decode('utf8'))
+ if self.query:
+ log.debug(self.query.decode('utf8'))
def row_dict(self, row, lstrip=None):
adjustname = lambda a: a
@@ -182,8 +185,9 @@
def __init__(self):
self.conn_known = {} # available connections
- self.conn_pool = {}
- self.lock = threading.Lock()
+ self.conn_pool = {} # active connetions
+ self.lock = threading.Lock() # mutual exclusion for threads
+ self.pid = multiprocessing.current_process().pid # forking check
def __del__(self):
for conn in tuple(self.conn_known.keys()):
@@ -240,6 +244,7 @@
def get_conn(self, name='default'):
'''Get connection of name 'name' from pool.'''
+ self._check_fork()
self.lock.acquire()
try:
if not name in self.conn_known:
@@ -355,6 +360,26 @@
return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
raise PgManagerError('Unknown isolation level name: "%s"', level)
return level
+
+ def _check_fork(self):
+ '''Check if process was forked (PID has changed).
+
+ If it was, clean parent's connections.
+ New connections are created for children.
+ Known connection credentials are inherited, but not shared.
+
+ '''
+ if self.pid == multiprocessing.current_process().pid:
+ # PID has not changed
+ return
+
+ # update saved PID
+ self.pid = multiprocessing.current_process().pid
+ # reinitialize lock
+ self.lock = threading.Lock()
+ # clean parent's connections
+ for name in self.conn_pool:
+ self.conn_pool[name] = []
@classmethod
def get_instance(cls):
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/multiprocessing.py Thu Dec 15 18:21:41 2011 +0100
@@ -0,0 +1,78 @@
+#!/usr/bin/env python3
+'''Multiprocessing test.
+
+PgManager is partially safe against process forking.
+
+You can create connection credentials (create_conn) in main process, then fork
+and continue working with pgmanager in children processes. You can call create_conn again
+in children, but new connection credentials will be accessible only from process
+where it was created -- no sharing between processes.
+
+get_conn and cursor will still work after fork. Connections from parent process will
+be forgotten, that cannot be used un child process.
+
+Connections cannot be shared between processes, DO NOT:
+ * get_conn, then fork, then put_conn
+ * get_conn, then fork, then use the connection, then .close()
+
+Good usage:
+ * create_conn, fork, get_conn, cursor, etc.
+
+Basically, you can transfer only information from create_conn(), nothing else.
+
+'''
+
+
+import multiprocessing
+
+from config import Config
+from pgtoolkit import pgmanager
+
+
+def sub1(id):
+ with pgm.cursor() as curs:
+ print('[%d] pgconn: %d' % (multiprocessing.current_process().pid, curs.connection.get_backend_pid()))
+ print('[%d] update' % multiprocessing.current_process().pid)
+ curs.execute('''UPDATE test SET name = 'multi-1' WHERE id=%s''', [id])
+ print('[%d] commit' % multiprocessing.current_process().pid)
+ curs.connection.commit()
+
+def sub2(id):
+ pgm = pgmanager.get_instance()
+ with pgm.cursor() as curs:
+ print('[%d] pgconn: %d' % (multiprocessing.current_process().pid, curs.connection.get_backend_pid()))
+ print('[%d] update' % multiprocessing.current_process().pid)
+ curs.execute('''UPDATE test SET name = 'multi-2' WHERE id=%s''', [id])
+ print('[%d] commit' % multiprocessing.current_process().pid)
+ curs.connection.commit()
+
+
+cfg = Config('tests.conf')
+pgm = pgmanager.get_instance()
+pgm.create_conn(**cfg)
+
+with pgm.cursor() as curs:
+ print('[%d] pgconn: %d' % (multiprocessing.current_process().pid, curs.connection.get_backend_pid()))
+ print('[%d] insert' % multiprocessing.current_process().pid)
+ curs.execute('''INSERT INTO test (name) VALUES ('multi') RETURNING id''')
+ id = curs.fetchone()[0]
+ curs.connection.commit()
+
+ print('[%d] update' % multiprocessing.current_process().pid)
+ curs.execute('''UPDATE test SET name = 'multi-main' WHERE id=%s''', [id])
+ # not committed
+
+p1 = multiprocessing.Process(target=sub1, args=[id])
+p1.start()
+p2 = multiprocessing.Process(target=sub2, args=[id])
+p2.start()
+
+with pgm.cursor() as curs:
+ print('[%d] commit' % multiprocessing.current_process().pid)
+ curs.connection.commit()
+
+print('join')
+p1.join()
+p2.join()
+
+print('done')