111 |
112 |
112 def execute(self, query, args=None): |
113 def execute(self, query, args=None): |
113 try: |
114 try: |
114 return super(Cursor, self).execute(query, args) |
115 return super(Cursor, self).execute(query, args) |
115 finally: |
116 finally: |
116 log.debug(self.query.decode('utf8')) |
117 if self.query: |
|
118 log.debug(self.query.decode('utf8')) |
117 |
119 |
118 def callproc(self, procname, args=None): |
120 def callproc(self, procname, args=None): |
119 try: |
121 try: |
120 return super(Cursor, self).callproc(procname, args) |
122 return super(Cursor, self).callproc(procname, args) |
121 finally: |
123 finally: |
122 log.debug(self.query.decode('utf8')) |
124 if self.query: |
|
125 log.debug(self.query.decode('utf8')) |
123 |
126 |
124 def row_dict(self, row, lstrip=None): |
127 def row_dict(self, row, lstrip=None): |
125 adjustname = lambda a: a |
128 adjustname = lambda a: a |
126 if lstrip: |
129 if lstrip: |
127 adjustname = lambda a: a.lstrip(lstrip) |
130 adjustname = lambda a: a.lstrip(lstrip) |
180 |
183 |
181 class PgManager: |
184 class PgManager: |
182 |
185 |
183 def __init__(self): |
186 def __init__(self): |
184 self.conn_known = {} # available connections |
187 self.conn_known = {} # available connections |
185 self.conn_pool = {} |
188 self.conn_pool = {} # active connetions |
186 self.lock = threading.Lock() |
189 self.lock = threading.Lock() # mutual exclusion for threads |
|
190 self.pid = multiprocessing.current_process().pid # forking check |
187 |
191 |
188 def __del__(self): |
192 def __del__(self): |
189 for conn in tuple(self.conn_known.keys()): |
193 for conn in tuple(self.conn_known.keys()): |
190 self.destroy_conn(conn) |
194 self.destroy_conn(conn) |
191 |
195 |
238 del self.conn_known[name] |
242 del self.conn_known[name] |
239 del self.conn_pool[name] |
243 del self.conn_pool[name] |
240 |
244 |
241 def get_conn(self, name='default'): |
245 def get_conn(self, name='default'): |
242 '''Get connection of name 'name' from pool.''' |
246 '''Get connection of name 'name' from pool.''' |
|
247 self._check_fork() |
243 self.lock.acquire() |
248 self.lock.acquire() |
244 try: |
249 try: |
245 if not name in self.conn_known: |
250 if not name in self.conn_known: |
246 raise PgManagerError("Connection name '%s' not registered." % name) |
251 raise PgManagerError("Connection name '%s' not registered." % name) |
247 |
252 |
353 return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED |
358 return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED |
354 if level.lower() == 'serializable': |
359 if level.lower() == 'serializable': |
355 return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE |
360 return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE |
356 raise PgManagerError('Unknown isolation level name: "%s"', level) |
361 raise PgManagerError('Unknown isolation level name: "%s"', level) |
357 return level |
362 return level |
|
363 |
|
364 def _check_fork(self): |
|
365 '''Check if process was forked (PID has changed). |
|
366 |
|
367 If it was, clean parent's connections. |
|
368 New connections are created for children. |
|
369 Known connection credentials are inherited, but not shared. |
|
370 |
|
371 ''' |
|
372 if self.pid == multiprocessing.current_process().pid: |
|
373 # PID has not changed |
|
374 return |
|
375 |
|
376 # update saved PID |
|
377 self.pid = multiprocessing.current_process().pid |
|
378 # reinitialize lock |
|
379 self.lock = threading.Lock() |
|
380 # clean parent's connections |
|
381 for name in self.conn_pool: |
|
382 self.conn_pool[name] = [] |
358 |
383 |
359 @classmethod |
384 @classmethod |
360 def get_instance(cls): |
385 def get_instance(cls): |
361 if not hasattr(cls, '_instance'): |
386 if not hasattr(cls, '_instance'): |
362 cls._instance = cls() |
387 cls._instance = cls() |