103 self.init_statement = init_statement |
103 self.init_statement = init_statement |
104 self.pool_size = pool_size |
104 self.pool_size = pool_size |
105 |
105 |
106 |
106 |
107 class RowDict(OrderedDict): |
107 class RowDict(OrderedDict): |
|
108 """Special read-only dictionary used for rows returned from queries. |
|
109 |
|
110 Initialization is same as for dict: |
|
111 row = RowDict([('id', 123), ('name', 'hello')]) |
|
112 |
|
113 Allows key and attribute access to contained items: |
|
114 row['id'] |
|
115 row.id |
|
116 |
|
117 Items keep order in which columns where returned from database. |
|
118 |
|
119 Tuple style access is also supported: |
|
120 row[0] |
|
121 id, name = row |
|
122 |
|
123 """ |
|
124 |
|
125 def __init__(self, data): |
|
126 self._dict = OrderedDict(data) |
|
127 |
|
128 def __getitem__(self, key): |
|
129 if isinstance(key, int): |
|
130 return tuple(self._dict.values())[key] |
|
131 return self._dict[key] |
108 |
132 |
109 def __getattr__(self, key): |
133 def __getattr__(self, key): |
110 try: |
134 try: |
111 return self[key] |
135 return self._dict[key] |
112 except KeyError: |
136 except KeyError: |
113 raise AttributeError(key) |
137 raise AttributeError(key) |
|
138 |
|
139 def __contains__(self, key): |
|
140 return key in self._dict |
114 |
141 |
115 |
142 |
116 class Cursor(psycopg2.extensions.cursor): |
143 class Cursor(psycopg2.extensions.cursor): |
117 |
144 |
118 def execute(self, query, args=None): |
145 def execute(self, query, args=None): |
158 adapted = [self.mogrify('%s', [x]).decode('utf8') for x in row] |
185 adapted = [self.mogrify('%s', [x]).decode('utf8') for x in row] |
159 return adapted |
186 return adapted |
160 |
187 |
161 def fetchone_adapted(self, lstrip=None): |
188 def fetchone_adapted(self, lstrip=None): |
162 '''Like fetchone_dict() but values are quoted for direct inclusion in SQL query. |
189 '''Like fetchone_dict() but values are quoted for direct inclusion in SQL query. |
163 |
190 |
164 This is useful when you need to generate SQL script from data returned |
191 This is useful when you need to generate SQL script from data returned |
165 by the query. Use mogrify() for simple cases. |
192 by the query. Use mogrify() for simple cases. |
166 |
193 |
167 ''' |
194 ''' |
168 row = super(Cursor, self).fetchone() |
195 row = super(Cursor, self).fetchone() |
169 if row is None: |
196 if row is None: |
170 return None |
197 return None |
171 return self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip) |
198 return self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip) |
221 self.destroy_conn(conn) |
248 self.destroy_conn(conn) |
222 |
249 |
223 def create_conn(self, name='default', isolation_level=None, keep_alive=True, init_statement=None, |
250 def create_conn(self, name='default', isolation_level=None, keep_alive=True, init_statement=None, |
224 pool_size=1, dsn=None, **kwargs): |
251 pool_size=1, dsn=None, **kwargs): |
225 '''Create named connection. |
252 '''Create named connection. |
226 |
253 |
227 name -- name for connection (default is "default") |
254 name -- name for connection (default is "default") |
228 pool_size -- how many connections will be kept open in pool |
255 pool_size -- how many connections will be kept open in pool |
229 (more connections will still be created but they will be closed by put_conn) |
256 (more connections will still be created but they will be closed by put_conn) |
230 None - disable pool, always return same connection |
257 None - disable pool, always return same connection |
231 isolation_level -- "autocommit", "read_committed", "serializable" or None for driver default |
258 isolation_level -- "autocommit", "read_committed", "serializable" or None for driver default |
232 keep_alive -- set socket to keepalive mode |
259 keep_alive -- set socket to keepalive mode |
233 dsn -- connection string (parameters or data source name) |
260 dsn -- connection string (parameters or data source name) |
234 |
261 |
235 Other keyword args are used as connection parameters. |
262 Other keyword args are used as connection parameters. |
236 |
263 |
237 ''' |
264 ''' |
238 if name in self.conn_known: |
265 if name in self.conn_known: |
239 raise PgManagerError('Connection name "%s" already registered.' % name) |
266 raise PgManagerError('Connection name "%s" already registered.' % name) |
240 |
267 |
241 if dsn is None: |
268 if dsn is None: |
244 isolation_level = self._normalize_isolation_level(isolation_level) |
271 isolation_level = self._normalize_isolation_level(isolation_level) |
245 ci = ConnectionInfo(name, dsn, isolation_level, keep_alive, init_statement, pool_size) |
272 ci = ConnectionInfo(name, dsn, isolation_level, keep_alive, init_statement, pool_size) |
246 |
273 |
247 self.conn_known[name] = ci |
274 self.conn_known[name] = ci |
248 self.conn_pool[name] = [] |
275 self.conn_pool[name] = [] |
249 |
276 |
250 def create_conn_listen(self, name, channel, dsn=None, copy_dsn=None, **kwargs): |
277 def create_conn_listen(self, name, channel, dsn=None, copy_dsn=None, **kwargs): |
251 '''Create connection listening for notifies. |
278 '''Create connection listening for notifies. |
252 |
279 |
253 Disables pool. If you want to use pool, create other connection for that. |
280 Disables pool. If you want to use pool, create other connection for that. |
254 This connection can be used as usual: conn.cursor() etc. |
281 This connection can be used as usual: conn.cursor() etc. |
255 Don't use PgManager's cursor() and put_conn(). |
282 Don't use PgManager's cursor() and put_conn(). |
256 |
283 |
257 name -- name for connection |
284 name -- name for connection |
258 channel -- listen on this channel |
285 channel -- listen on this channel |
259 copy_dsn -- specify name of other connection and its dsn will be used |
286 copy_dsn -- specify name of other connection and its dsn will be used |
260 |
287 |
261 Other parameters forwarded to create_conn(). |
288 Other parameters forwarded to create_conn(). |
262 |
289 |
263 ''' |
290 ''' |
264 if dsn is None and copy_dsn: |
291 if dsn is None and copy_dsn: |
265 try: |
292 try: |
266 dsn = self.conn_known[copy_dsn].dsn |
293 dsn = self.conn_known[copy_dsn].dsn |
267 except KeyError: |
294 except KeyError: |
270 self.create_conn(name=name, pool_size=None, isolation_level='autocommit', init_statement=listen_query, |
297 self.create_conn(name=name, pool_size=None, isolation_level='autocommit', init_statement=listen_query, |
271 dsn=dsn, **kwargs) |
298 dsn=dsn, **kwargs) |
272 |
299 |
273 def close_conn(self, name='default'): |
300 def close_conn(self, name='default'): |
274 '''Close all connections of given name. |
301 '''Close all connections of given name. |
275 |
302 |
276 Connection credentials are still saved. |
303 Connection credentials are still saved. |
277 |
304 |
278 ''' |
305 ''' |
279 while len(self.conn_pool[name]): |
306 while len(self.conn_pool[name]): |
280 conn = self.conn_pool[name].pop() |
307 conn = self.conn_pool[name].pop() |
281 conn.close() |
308 conn.close() |
282 |
309 |
283 def destroy_conn(self, name='default'): |
310 def destroy_conn(self, name='default'): |
284 '''Destroy connection. |
311 '''Destroy connection. |
285 |
312 |
286 Counterpart of create_conn. |
313 Counterpart of create_conn. |
287 |
314 |
288 ''' |
315 ''' |
289 if not name in self.conn_known: |
316 if not name in self.conn_known: |
290 raise PgManagerError('Connection name "%s" not registered.' % name) |
317 raise PgManagerError('Connection name "%s" not registered.' % name) |
291 |
318 |
292 self.close_conn(name) |
319 self.close_conn(name) |
301 try: |
328 try: |
302 try: |
329 try: |
303 ci = self.conn_known[name] |
330 ci = self.conn_known[name] |
304 except KeyError: |
331 except KeyError: |
305 raise PgManagerError("Connection name '%s' not registered." % name) |
332 raise PgManagerError("Connection name '%s' not registered." % name) |
306 |
333 |
307 # no pool, just one static connection |
334 # no pool, just one static connection |
308 if ci.pool_size is None: |
335 if ci.pool_size is None: |
309 # check for existing connection |
336 # check for existing connection |
310 try: |
337 try: |
311 conn = self.conn_pool[name][0] |
338 conn = self.conn_pool[name][0] |
316 self.conn_pool[name].append(conn) |
343 self.conn_pool[name].append(conn) |
317 # if no existing connection is valid, connect new one and save it |
344 # if no existing connection is valid, connect new one and save it |
318 if conn is None: |
345 if conn is None: |
319 conn = self._connect(ci) |
346 conn = self._connect(ci) |
320 self.conn_pool[name][0] = conn |
347 self.conn_pool[name][0] = conn |
321 |
348 |
322 # connection from pool |
349 # connection from pool |
323 else: |
350 else: |
324 conn = None |
351 conn = None |
325 while len(self.conn_pool[name]) and conn is None: |
352 while len(self.conn_pool[name]) and conn is None: |
326 conn = self.conn_pool[name].pop() |
353 conn = self.conn_pool[name].pop() |
327 if conn.closed: |
354 if conn.closed: |
328 conn = None |
355 conn = None |
329 |
356 |
330 if conn is None: |
357 if conn is None: |
331 conn = self._connect(ci) |
358 conn = self._connect(ci) |
332 finally: |
359 finally: |
333 self.lock.release() |
360 self.lock.release() |
334 return conn |
361 return conn |
335 |
362 |
336 def put_conn(self, conn, name='default'): |
363 def put_conn(self, conn, name='default'): |
337 '''Put connection back to pool. |
364 '''Put connection back to pool. |
338 |
365 |
339 Name must be same as used for get_conn, |
366 Name must be same as used for get_conn, |
340 otherwise things become broken. |
367 otherwise things become broken. |
341 |
368 |
342 ''' |
369 ''' |
343 self.lock.acquire() |
370 self.lock.acquire() |
344 try: |
371 try: |
345 if not name in self.conn_known: |
372 if not name in self.conn_known: |
346 raise PgManagerError("Connection name '%s' not registered." % name) |
373 raise PgManagerError("Connection name '%s' not registered." % name) |
389 log_notices.info(notice.rstrip()) |
416 log_notices.info(notice.rstrip()) |
390 conn.notices[:] = [] |
417 conn.notices[:] = [] |
391 |
418 |
392 def wait_for_notify(self, name='default', timeout=5.0): |
419 def wait_for_notify(self, name='default', timeout=5.0): |
393 '''Wait for asynchronous notifies, return the last one. |
420 '''Wait for asynchronous notifies, return the last one. |
394 |
421 |
395 name -- name of connection, must be created using create_conn_listen() |
422 name -- name of connection, must be created using create_conn_listen() |
396 timeout -- in seconds, floating point (None - wait forever) |
423 timeout -- in seconds, floating point (None - wait forever) |
397 |
424 |
398 Returns None on timeout. |
425 Returns None on timeout. |
399 |
426 |
400 ''' |
427 ''' |
401 conn = self.get_conn(name) |
428 conn = self.get_conn(name) |
402 |
429 |
403 # return any notifies on stack |
430 # return any notifies on stack |
404 if conn.notifies: |
431 if conn.notifies: |
405 return conn.notifies.pop() |
432 return conn.notifies.pop() |
406 |
433 |
407 if select.select([conn], [], [], timeout) == ([], [], []): |
434 if select.select([conn], [], [], timeout) == ([], [], []): |
436 return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED |
463 return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED |
437 if level.lower() == 'serializable': |
464 if level.lower() == 'serializable': |
438 return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE |
465 return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE |
439 raise PgManagerError('Unknown isolation level name: "%s"', level) |
466 raise PgManagerError('Unknown isolation level name: "%s"', level) |
440 return level |
467 return level |
441 |
468 |
442 def _check_fork(self): |
469 def _check_fork(self): |
443 '''Check if process was forked (PID has changed). |
470 '''Check if process was forked (PID has changed). |
444 |
471 |
445 If it was, clean parent's connections. |
472 If it was, clean parent's connections. |
446 New connections are created for children. |
473 New connections are created for children. |
447 Known connection credentials are inherited, but not shared. |
474 Known connection credentials are inherited, but not shared. |
448 |
475 |
449 ''' |
476 ''' |
450 if self.pid == multiprocessing.current_process().pid: |
477 if self.pid == multiprocessing.current_process().pid: |
451 # PID has not changed |
478 # PID has not changed |
452 return |
479 return |
453 |
480 |
454 # update saved PID |
481 # update saved PID |
455 self.pid = multiprocessing.current_process().pid |
482 self.pid = multiprocessing.current_process().pid |
456 # reinitialize lock |
483 # reinitialize lock |
457 self.lock = threading.Lock() |
484 self.lock = threading.Lock() |
458 # clean parent's connections |
485 # clean parent's connections |