PgManager: Add partial support for multiprocessing.
authorRadek Brich <radek.brich@devl.cz>
Thu, 15 Dec 2011 18:21:41 +0100
changeset 24 5664afa530e5
parent 23 dc2dbe872fc8
child 25 20a72a9a2d09
PgManager: Add partial support for multiprocessing.
pgtoolkit/pgmanager.py
tests/multiprocessing.py
--- a/pgtoolkit/pgmanager.py	Wed Dec 14 16:29:33 2011 +0100
+++ b/pgtoolkit/pgmanager.py	Thu Dec 15 18:21:41 2011 +0100
@@ -72,6 +72,7 @@
 from contextlib import contextmanager
 import logging
 import threading
+import multiprocessing
 import select
 import socket
 
@@ -113,13 +114,15 @@
         try:
             return super(Cursor, self).execute(query, args)
         finally:
-            log.debug(self.query.decode('utf8'))
+            if self.query:
+                log.debug(self.query.decode('utf8'))
 
     def callproc(self, procname, args=None):
         try:
             return super(Cursor, self).callproc(procname, args)
         finally:
-            log.debug(self.query.decode('utf8'))
+            if self.query:
+                log.debug(self.query.decode('utf8'))
 
     def row_dict(self, row, lstrip=None):
         adjustname = lambda a: a
@@ -182,8 +185,9 @@
 
     def __init__(self):
         self.conn_known = {}  # available connections
-        self.conn_pool = {}
-        self.lock = threading.Lock()
+        self.conn_pool = {}  # active connetions
+        self.lock = threading.Lock()  # mutual exclusion for threads
+        self.pid = multiprocessing.current_process().pid  # forking check
 
     def __del__(self):
         for conn in tuple(self.conn_known.keys()):
@@ -240,6 +244,7 @@
 
     def get_conn(self, name='default'):
         '''Get connection of name 'name' from pool.'''
+        self._check_fork()
         self.lock.acquire()
         try:
             if not name in self.conn_known:
@@ -355,6 +360,26 @@
                 return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
             raise PgManagerError('Unknown isolation level name: "%s"', level)
         return level
+    
+    def _check_fork(self):
+        '''Check if process was forked (PID has changed).
+        
+        If it was, clean parent's connections.
+        New connections are created for children.
+        Known connection credentials are inherited, but not shared.
+        
+        '''
+        if self.pid == multiprocessing.current_process().pid:
+            # PID has not changed
+            return
+        
+        # update saved PID
+        self.pid = multiprocessing.current_process().pid
+        # reinitialize lock
+        self.lock = threading.Lock()
+        # clean parent's connections
+        for name in self.conn_pool:
+            self.conn_pool[name] = []
 
     @classmethod
     def get_instance(cls):
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/multiprocessing.py	Thu Dec 15 18:21:41 2011 +0100
@@ -0,0 +1,78 @@
+#!/usr/bin/env python3
+'''Multiprocessing test.
+
+PgManager is partially safe against process forking.
+
+You can create connection credentials (create_conn) in main process, then fork
+and continue working with pgmanager in children processes. You can call create_conn again
+in children, but new connection credentials will be accessible only from process
+where it was created -- no sharing between processes.
+
+get_conn and cursor will still work after fork. Connections from parent process will
+be forgotten, that cannot be used un child process. 
+
+Connections cannot be shared between processes, DO NOT:
+ * get_conn, then fork, then put_conn
+ * get_conn, then fork, then use the connection, then .close()
+
+Good usage:
+ * create_conn, fork, get_conn, cursor, etc.
+
+Basically, you can transfer only information from create_conn(), nothing else.
+
+'''
+
+
+import multiprocessing
+
+from config import Config
+from pgtoolkit import pgmanager
+
+
+def sub1(id):
+    with pgm.cursor() as curs:
+        print('[%d] pgconn: %d' % (multiprocessing.current_process().pid, curs.connection.get_backend_pid()))
+        print('[%d] update' % multiprocessing.current_process().pid)
+        curs.execute('''UPDATE test SET name = 'multi-1' WHERE id=%s''', [id])
+        print('[%d] commit' % multiprocessing.current_process().pid)
+        curs.connection.commit()
+
+def sub2(id):
+    pgm = pgmanager.get_instance()
+    with pgm.cursor() as curs:
+        print('[%d] pgconn: %d' % (multiprocessing.current_process().pid, curs.connection.get_backend_pid()))
+        print('[%d] update' % multiprocessing.current_process().pid)
+        curs.execute('''UPDATE test SET name = 'multi-2' WHERE id=%s''', [id])
+        print('[%d] commit' % multiprocessing.current_process().pid)
+        curs.connection.commit()
+
+
+cfg = Config('tests.conf')
+pgm = pgmanager.get_instance()
+pgm.create_conn(**cfg)
+
+with pgm.cursor() as curs:
+    print('[%d] pgconn: %d' % (multiprocessing.current_process().pid, curs.connection.get_backend_pid()))    
+    print('[%d] insert' % multiprocessing.current_process().pid)
+    curs.execute('''INSERT INTO test (name) VALUES ('multi') RETURNING id''')
+    id = curs.fetchone()[0]
+    curs.connection.commit()
+    
+    print('[%d] update' % multiprocessing.current_process().pid)
+    curs.execute('''UPDATE test SET name = 'multi-main' WHERE id=%s''', [id])
+    # not committed
+
+p1 = multiprocessing.Process(target=sub1, args=[id])
+p1.start()
+p2 = multiprocessing.Process(target=sub2, args=[id])
+p2.start()
+
+with pgm.cursor() as curs:
+    print('[%d] commit' % multiprocessing.current_process().pid)
+    curs.connection.commit()
+
+print('join')
+p1.join()
+p2.join()
+
+print('done')