equal
deleted
inserted
replaced
68 |
68 |
69 from contextlib import contextmanager |
69 from contextlib import contextmanager |
70 import logging |
70 import logging |
71 import threading |
71 import threading |
72 import select |
72 import select |
|
73 import socket |
73 |
74 |
74 import psycopg2 |
75 import psycopg2 |
75 import psycopg2.extensions |
76 import psycopg2.extensions |
76 |
77 |
77 from psycopg2 import DatabaseError, IntegrityError |
78 from psycopg2 import DatabaseError, IntegrityError |
151 if name is None: |
152 if name is None: |
152 return super(Connection, self).cursor(cursor_factory=Cursor) |
153 return super(Connection, self).cursor(cursor_factory=Cursor) |
153 else: |
154 else: |
154 return super(Connection, self).cursor(name, cursor_factory=Cursor) |
155 return super(Connection, self).cursor(name, cursor_factory=Cursor) |
155 |
156 |
|
157 def keep_alive(self): |
|
158 '''Set socket to keepalive mode. Must be called before any query.''' |
|
159 sock = socket.fromfd(self.fileno(), socket.AF_INET, socket.SOCK_STREAM) |
|
160 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) |
|
161 # Maximum keep-alive probes before asuming the connection is lost |
|
162 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5) |
|
163 # Interval (in seconds) between keep-alive probes |
|
164 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 2) |
|
165 # Maximum idle time (in seconds) before start sending keep-alive probes |
|
166 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10) |
|
167 |
156 |
168 |
157 class PgManager: |
169 class PgManager: |
158 |
170 |
159 def __init__(self): |
171 def __init__(self): |
160 self.conn_known = {} # available connections |
172 self.conn_known = {} # available connections |
216 if conn.closed: |
228 if conn.closed: |
217 conn = None |
229 conn = None |
218 |
230 |
219 if conn is None: |
231 if conn is None: |
220 ci = self.conn_known[name] |
232 ci = self.conn_known[name] |
221 conn = psycopg2.connect(ci.dsn, connection_factory=Connection) |
233 conn = self._connect(ci) |
222 if not ci.isolation_level is None: |
|
223 conn.set_isolation_level(ci.isolation_level) |
|
224 if ci.init_statement: |
|
225 curs = conn.cursor() |
|
226 curs.execute(ci.init_statement) |
|
227 curs.close() |
|
228 finally: |
234 finally: |
229 self.lock.release() |
235 self.lock.release() |
230 return conn |
236 return conn |
231 |
237 |
232 def put_conn(self, conn, name='default'): |
238 def put_conn(self, conn, name='default'): |
301 finally: |
307 finally: |
302 # clean notifies |
308 # clean notifies |
303 while conn.notifies: |
309 while conn.notifies: |
304 conn.notifies.pop() |
310 conn.notifies.pop() |
305 self.put_conn(conn, name) |
311 self.put_conn(conn, name) |
|
312 |
|
313 def _connect(self, ci): |
|
314 conn = psycopg2.connect(ci.dsn, connection_factory=Connection) |
|
315 conn.keep_alive() |
|
316 if not ci.isolation_level is None: |
|
317 conn.set_isolation_level(ci.isolation_level) |
|
318 if ci.init_statement: |
|
319 curs = conn.cursor() |
|
320 curs.execute(ci.init_statement) |
|
321 curs.close() |
|
322 return conn |
306 |
323 |
307 def _normalize_isolation_level(self, level): |
324 def _normalize_isolation_level(self, level): |
308 if type(level) == str: |
325 if type(level) == str: |
309 if level.lower() == 'autocommit': |
326 if level.lower() == 'autocommit': |
310 return psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT |
327 return psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT |