pgtoolkit/pgmanager.py
changeset 24 5664afa530e5
parent 23 dc2dbe872fc8
child 26 7f219da7ab71
equal deleted inserted replaced
23:dc2dbe872fc8 24:5664afa530e5
    70 """
    70 """
    71 
    71 
    72 from contextlib import contextmanager
    72 from contextlib import contextmanager
    73 import logging
    73 import logging
    74 import threading
    74 import threading
       
    75 import multiprocessing
    75 import select
    76 import select
    76 import socket
    77 import socket
    77 
    78 
    78 import psycopg2
    79 import psycopg2
    79 import psycopg2.extensions
    80 import psycopg2.extensions
   111 
   112 
   112     def execute(self, query, args=None):
   113     def execute(self, query, args=None):
   113         try:
   114         try:
   114             return super(Cursor, self).execute(query, args)
   115             return super(Cursor, self).execute(query, args)
   115         finally:
   116         finally:
   116             log.debug(self.query.decode('utf8'))
   117             if self.query:
       
   118                 log.debug(self.query.decode('utf8'))
   117 
   119 
   118     def callproc(self, procname, args=None):
   120     def callproc(self, procname, args=None):
   119         try:
   121         try:
   120             return super(Cursor, self).callproc(procname, args)
   122             return super(Cursor, self).callproc(procname, args)
   121         finally:
   123         finally:
   122             log.debug(self.query.decode('utf8'))
   124             if self.query:
       
   125                 log.debug(self.query.decode('utf8'))
   123 
   126 
   124     def row_dict(self, row, lstrip=None):
   127     def row_dict(self, row, lstrip=None):
   125         adjustname = lambda a: a
   128         adjustname = lambda a: a
   126         if lstrip:
   129         if lstrip:
   127             adjustname = lambda a: a.lstrip(lstrip)
   130             adjustname = lambda a: a.lstrip(lstrip)
   180 
   183 
   181 class PgManager:
   184 class PgManager:
   182 
   185 
   183     def __init__(self):
   186     def __init__(self):
   184         self.conn_known = {}  # available connections
   187         self.conn_known = {}  # available connections
   185         self.conn_pool = {}
   188         self.conn_pool = {}  # active connetions
   186         self.lock = threading.Lock()
   189         self.lock = threading.Lock()  # mutual exclusion for threads
       
   190         self.pid = multiprocessing.current_process().pid  # forking check
   187 
   191 
   188     def __del__(self):
   192     def __del__(self):
   189         for conn in tuple(self.conn_known.keys()):
   193         for conn in tuple(self.conn_known.keys()):
   190             self.destroy_conn(conn)
   194             self.destroy_conn(conn)
   191 
   195 
   238         del self.conn_known[name]
   242         del self.conn_known[name]
   239         del self.conn_pool[name]
   243         del self.conn_pool[name]
   240 
   244 
   241     def get_conn(self, name='default'):
   245     def get_conn(self, name='default'):
   242         '''Get connection of name 'name' from pool.'''
   246         '''Get connection of name 'name' from pool.'''
       
   247         self._check_fork()
   243         self.lock.acquire()
   248         self.lock.acquire()
   244         try:
   249         try:
   245             if not name in self.conn_known:
   250             if not name in self.conn_known:
   246                 raise PgManagerError("Connection name '%s' not registered." % name)
   251                 raise PgManagerError("Connection name '%s' not registered." % name)
   247 
   252 
   353                 return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED
   358                 return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED
   354             if level.lower() == 'serializable':
   359             if level.lower() == 'serializable':
   355                 return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
   360                 return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
   356             raise PgManagerError('Unknown isolation level name: "%s"', level)
   361             raise PgManagerError('Unknown isolation level name: "%s"', level)
   357         return level
   362         return level
       
   363     
       
   364     def _check_fork(self):
       
   365         '''Check if process was forked (PID has changed).
       
   366         
       
   367         If it was, clean parent's connections.
       
   368         New connections are created for children.
       
   369         Known connection credentials are inherited, but not shared.
       
   370         
       
   371         '''
       
   372         if self.pid == multiprocessing.current_process().pid:
       
   373             # PID has not changed
       
   374             return
       
   375         
       
   376         # update saved PID
       
   377         self.pid = multiprocessing.current_process().pid
       
   378         # reinitialize lock
       
   379         self.lock = threading.Lock()
       
   380         # clean parent's connections
       
   381         for name in self.conn_pool:
       
   382             self.conn_pool[name] = []
   358 
   383 
   359     @classmethod
   384     @classmethod
   360     def get_instance(cls):
   385     def get_instance(cls):
   361         if not hasattr(cls, '_instance'):
   386         if not hasattr(cls, '_instance'):
   362             cls._instance = cls()
   387             cls._instance = cls()