5 # Requires: Python 3.2, psycopg2 |
5 # Requires: Python 3.2, psycopg2 |
6 # |
6 # |
7 # Part of pgtoolkit |
7 # Part of pgtoolkit |
8 # http://hg.devl.cz/pgtoolkit |
8 # http://hg.devl.cz/pgtoolkit |
9 # |
9 # |
10 # Copyright (c) 2010, 2011 Radek Brich <radek.brich@devl.cz> |
10 # Copyright (c) 2010, 2011, 2012 Radek Brich <radek.brich@devl.cz> |
11 # |
11 # |
12 # Permission is hereby granted, free of charge, to any person obtaining a copy |
12 # Permission is hereby granted, free of charge, to any person obtaining a copy |
13 # of this software and associated documentation files (the "Software"), to deal |
13 # of this software and associated documentation files (the "Software"), to deal |
14 # in the Software without restriction, including without limitation the rights |
14 # in the Software without restriction, including without limitation the rights |
15 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
15 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
61 |
61 |
62 On second line we have created connection named 'default' (this name can be left out). |
62 On second line we have created connection named 'default' (this name can be left out). |
63 The with statement obtains connection (actually connects to database when needed), |
63 The with statement obtains connection (actually connects to database when needed), |
64 then returns cursor for this connection. At the end of with statement, |
64 then returns cursor for this connection. At the end of with statement, |
65 the connection is returned to the pool or closed (depending on number of connections |
65 the connection is returned to the pool or closed (depending on number of connections |
66 in pool and on setting of keep_open parameter). |
66 in pool and on setting of pool_size parameter). |
67 |
67 |
68 The row returned by fetchone_dict() is special dict object, which can be accessed |
68 The row returned by fetchone_dict() is special dict object, which can be accessed |
69 using item or attribute access, that is row['now'] or row.now. |
69 using item or attribute access, that is row['now'] or row.now. |
70 """ |
70 """ |
71 |
71 |
92 pass |
92 pass |
93 |
93 |
94 |
94 |
95 class ConnectionInfo: |
95 class ConnectionInfo: |
96 |
96 |
97 def __init__(self, dsn, isolation_level=None, keep_alive=True, |
97 def __init__(self, name, dsn, isolation_level=None, keep_alive=True, |
98 init_statement=None, keep_open=1): |
98 init_statement=None, pool_size=1): |
99 |
99 self.name = name # connection name is logged with SQL queries |
100 self.dsn = dsn |
100 self.dsn = dsn |
101 self.isolation_level = isolation_level |
101 self.isolation_level = isolation_level |
102 self.keep_alive = keep_alive |
102 self.keep_alive = keep_alive |
103 self.init_statement = init_statement |
103 self.init_statement = init_statement |
104 self.keep_open = keep_open |
104 self.pool_size = pool_size |
105 |
105 |
106 |
106 |
107 class RowDict(OrderedDict): |
107 class RowDict(OrderedDict): |
108 |
108 |
109 def __getattr__(self, key): |
109 def __getattr__(self, key): |
114 |
114 |
115 |
115 |
116 class Cursor(psycopg2.extensions.cursor): |
116 class Cursor(psycopg2.extensions.cursor): |
117 |
117 |
118 def execute(self, query, args=None): |
118 def execute(self, query, args=None): |
|
119 self._log_query(query, args) |
119 try: |
120 try: |
120 return super(Cursor, self).execute(query, args) |
121 return super(Cursor, self).execute(query, args) |
121 finally: |
122 except DatabaseError: |
122 self._log_query() |
123 self._log_exception() |
123 |
124 |
124 def callproc(self, procname, args=None): |
125 def callproc(self, procname, args=None): |
|
126 self._log_query('CALL %s(%s)' % (procname, ','.join(args))) |
125 try: |
127 try: |
126 return super(Cursor, self).callproc(procname, args) |
128 return super(Cursor, self).callproc(procname, args) |
127 finally: |
129 except DatabaseError: |
128 self._log_query() |
130 self._log_exception() |
129 |
131 |
130 def row_dict(self, row, lstrip=None): |
132 def row_dict(self, row, lstrip=None): |
131 adjustname = lambda a: a |
133 adjustname = lambda a: a |
132 if lstrip: |
134 if lstrip: |
133 adjustname = lambda a: a.lstrip(lstrip) |
135 adjustname = lambda a: a.lstrip(lstrip) |
160 def fetchall_adapted(self, lstrip=None): |
162 def fetchall_adapted(self, lstrip=None): |
161 '''Like fetchall_dict() but values are quoted for direct inclusion in SQL query.''' |
163 '''Like fetchall_dict() but values are quoted for direct inclusion in SQL query.''' |
162 rows = super(Cursor, self).fetchall() |
164 rows = super(Cursor, self).fetchall() |
163 return [self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip) for row in rows] |
165 return [self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip) for row in rows] |
164 |
166 |
165 def _log_query(self): |
167 def _log_query(self, query, args=None): |
166 if self.query: |
168 name = self.connection.name if hasattr(self.connection, 'name') else '-' |
167 name = self.connection.name if hasattr(self.connection, 'name') else '-' |
169 log_sql.info('[%s] %s' % (name, self.mogrify(query, args).decode('utf8'))) |
168 log_sql.info('[%s] %s' % (name, self.query.decode('utf8'))) |
170 |
|
171 def _log_exception(self): |
|
172 name = self.connection.name if hasattr(self.connection, 'name') else '-' |
|
173 log_sql.exception('[%s] exception:' % (name,)) |
169 |
174 |
170 class Connection(psycopg2.extensions.connection): |
175 class Connection(psycopg2.extensions.connection): |
171 |
176 |
172 def cursor(self, name=None): |
177 def cursor(self, name=None): |
173 if name is None: |
178 if name is None: |
200 |
205 |
201 def __del__(self): |
206 def __del__(self): |
202 for conn in tuple(self.conn_known.keys()): |
207 for conn in tuple(self.conn_known.keys()): |
203 self.destroy_conn(conn) |
208 self.destroy_conn(conn) |
204 |
209 |
205 def create_conn(self, name='default', keep_open=1, isolation_level=None, keep_alive=True, dsn=None, **kw): |
210 def create_conn(self, name='default', isolation_level=None, keep_alive=True, init_statement=None, |
|
211 pool_size=1, dsn=None, **kwargs): |
206 '''Create named connection. |
212 '''Create named connection. |
207 |
213 |
208 name -- name for connection (default is "default") |
214 name -- name for connection (default is "default") |
209 keep_open -- how many connections will be kept open in pool (more connections will still be created, |
215 pool_size -- how many connections will be kept open in pool |
210 but they will be closed by put_conn) |
216 (more connections will still be created but they will be closed by put_conn) |
|
217 None - disable pool, always return same connection |
211 isolation_level -- "autocommit", "read_committed", "serializable" or None for driver default |
218 isolation_level -- "autocommit", "read_committed", "serializable" or None for driver default |
212 keep_alive -- set socket to keepalive mode |
219 keep_alive -- set socket to keepalive mode |
213 dsn -- string with connection parameters (dsn means Data Source Name) |
220 dsn -- string with connection parameters (dsn means Data Source Name) |
214 |
221 |
215 Alternative for dsn is keyword args (same names as in dsn). |
222 Alternative for dsn is keyword args (same names as in dsn). |
217 ''' |
224 ''' |
218 if name in self.conn_known: |
225 if name in self.conn_known: |
219 raise PgManagerError('Connection name "%s" already registered.' % name) |
226 raise PgManagerError('Connection name "%s" already registered.' % name) |
220 |
227 |
221 if dsn is None: |
228 if dsn is None: |
222 dsn = ' '.join([x[0]+'='+str(x[1]) for x in kw.items() if x[1] is not None]) |
229 dsn = ' '.join([x[0]+'='+str(x[1]) for x in kwargs.items() if x[1] is not None]) |
223 |
230 |
224 isolation_level = self._normalize_isolation_level(isolation_level) |
231 isolation_level = self._normalize_isolation_level(isolation_level) |
225 ci = ConnectionInfo(dsn, isolation_level, keep_alive, keep_open=keep_open) |
232 ci = ConnectionInfo(name, dsn, isolation_level, keep_alive, init_statement, pool_size) |
226 |
233 |
227 self.conn_known[name] = ci |
234 self.conn_known[name] = ci |
228 self.conn_pool[name] = [] |
235 self.conn_pool[name] = [] |
|
236 |
|
237 def create_conn_listen(self, name, channel, dsn=None, copy_dsn=None, **kwargs): |
|
238 '''Create connection listening for notifies. |
|
239 |
|
240 Disables pool. If you want to use pool, create other connection for that. |
|
241 This connection can be used as usual: conn.cursor() etc. |
|
242 Don't use PgManager's cursor() and put_conn(). |
|
243 |
|
244 name -- name for connection |
|
245 channel -- listen on this channel |
|
246 copy_dsn -- specify name of other connection and its dsn will be used |
|
247 |
|
248 Other parameters forwarded to create_conn(). |
|
249 |
|
250 ''' |
|
251 if dsn is None and copy_dsn: |
|
252 try: |
|
253 dsn = self.conn_known[copy_dsn].dsn |
|
254 except KeyError: |
|
255 raise PgManagerError("Connection name '%s' not registered." % copy_dsn) |
|
256 listen_query = "LISTEN " + channel |
|
257 self.create_conn(name=name, pool_size=None, isolation_level='autocommit', init_statement=listen_query, |
|
258 dsn=dsn, **kwargs) |
229 |
259 |
230 def close_conn(self, name='default'): |
260 def close_conn(self, name='default'): |
231 '''Close all connections of given name. |
261 '''Close all connections of given name. |
232 |
262 |
233 Connection credentials are still saved. |
263 Connection credentials are still saved. |
254 def get_conn(self, name='default'): |
284 def get_conn(self, name='default'): |
255 '''Get connection of name 'name' from pool.''' |
285 '''Get connection of name 'name' from pool.''' |
256 self._check_fork() |
286 self._check_fork() |
257 self.lock.acquire() |
287 self.lock.acquire() |
258 try: |
288 try: |
259 if not name in self.conn_known: |
289 try: |
|
290 ci = self.conn_known[name] |
|
291 except KeyError: |
260 raise PgManagerError("Connection name '%s' not registered." % name) |
292 raise PgManagerError("Connection name '%s' not registered." % name) |
261 |
293 |
262 conn = None |
294 # no pool, just one static connection |
263 while len(self.conn_pool[name]) and conn is None: |
295 if ci.pool_size is None: |
264 conn = self.conn_pool[name].pop() |
296 # check for existing connection |
265 if conn.closed: |
297 try: |
|
298 conn = self.conn_pool[name][0] |
|
299 if conn.closed: |
|
300 conn = None |
|
301 except IndexError: |
266 conn = None |
302 conn = None |
267 |
303 self.conn_pool[name].append(conn) |
268 if conn is None: |
304 # if no existing connection is valid, connect new one and save it |
269 ci = self.conn_known[name] |
305 if conn is None: |
270 conn = self._connect(ci) |
306 conn = self._connect(ci) |
271 # add our name to connection instance (this is then logged with SQL queries) |
307 self.conn_pool[name][0] = conn |
272 conn.name = name |
308 |
|
309 # connection from pool |
|
310 else: |
|
311 conn = None |
|
312 while len(self.conn_pool[name]) and conn is None: |
|
313 conn = self.conn_pool[name].pop() |
|
314 if conn.closed: |
|
315 conn = None |
|
316 |
|
317 if conn is None: |
|
318 conn = self._connect(ci) |
273 finally: |
319 finally: |
274 self.lock.release() |
320 self.lock.release() |
275 return conn |
321 return conn |
276 |
322 |
277 def put_conn(self, conn, name='default'): |
323 def put_conn(self, conn, name='default'): |
328 def log_notices(self, conn): |
374 def log_notices(self, conn): |
329 for notice in conn.notices: |
375 for notice in conn.notices: |
330 log_notices.info(notice.rstrip()) |
376 log_notices.info(notice.rstrip()) |
331 conn.notices[:] = [] |
377 conn.notices[:] = [] |
332 |
378 |
333 def wait_for_notify(self, name='default', timeout=5): |
379 def wait_for_notify(self, name='default', timeout=5.0): |
334 '''Wait for asynchronous notifies, return the last one. |
380 '''Wait for asynchronous notifies, return the last one. |
335 |
381 |
|
382 name -- name of connection, must be created using create_conn_listen() |
|
383 timeout -- in seconds, floating point (None - wait forever) |
|
384 |
336 Returns None on timeout. |
385 Returns None on timeout. |
337 |
386 |
338 ''' |
387 ''' |
339 conn = self.get_conn(name) |
388 conn = self.get_conn(name) |
340 |
389 |
341 try: |
390 # return any notifies on stack |
342 # any residual notify? |
391 if conn.notifies: |
343 # then return it, that should not break anything |
392 return conn.notifies.pop() |
|
393 |
|
394 if select.select([conn], [], [], timeout) == ([], [], []): |
|
395 # timeout |
|
396 return None |
|
397 else: |
|
398 conn.poll() |
|
399 |
|
400 # return just the last notify (we do not care for older ones) |
344 if conn.notifies: |
401 if conn.notifies: |
345 return conn.notifies.pop() |
402 return conn.notifies.pop() |
346 |
403 return None |
347 if select.select([conn], [], [], timeout) == ([], [], []): |
|
348 # timeout |
|
349 return None |
|
350 else: |
|
351 conn.poll() |
|
352 |
|
353 # return just the last notify (we do not care for older ones) |
|
354 if conn.notifies: |
|
355 return conn.notifies.pop() |
|
356 return None |
|
357 finally: |
|
358 # clean notifies |
|
359 while conn.notifies: |
|
360 conn.notifies.pop() |
|
361 self.put_conn(conn, name) |
|
362 |
404 |
363 def _connect(self, ci): |
405 def _connect(self, ci): |
364 conn = psycopg2.connect(ci.dsn, connection_factory=Connection) |
406 conn = psycopg2.connect(ci.dsn, connection_factory=Connection) |
|
407 conn.name = ci.name |
365 if ci.keep_alive: |
408 if ci.keep_alive: |
366 conn.keep_alive() |
409 conn.keep_alive() |
367 if not ci.isolation_level is None: |
410 if not ci.isolation_level is None: |
368 conn.set_isolation_level(ci.isolation_level) |
411 conn.set_isolation_level(ci.isolation_level) |
369 if ci.init_statement: |
412 if ci.init_statement: |