76 import socket |
76 import socket |
77 |
77 |
78 import psycopg2 |
78 import psycopg2 |
79 import psycopg2.extensions |
79 import psycopg2.extensions |
80 |
80 |
81 from psycopg2 import DatabaseError, IntegrityError |
81 from psycopg2 import DatabaseError, IntegrityError, OperationalError |
82 |
82 |
83 |
83 |
84 class PgManagerError(Exception): |
84 class PgManagerError(Exception): |
85 |
85 |
86 pass |
86 pass |
87 |
87 |
88 |
88 |
89 class ConnectionInfo: |
89 class ConnectionInfo: |
90 |
90 |
91 def __init__(self, dsn, isolation_level=None, init_statement=None, keep_open=1): |
91 def __init__(self, dsn, isolation_level=None, keep_alive=True, |
|
92 init_statement=None, keep_open=1): |
|
93 |
92 self.dsn = dsn |
94 self.dsn = dsn |
93 self.isolation_level = isolation_level |
95 self.isolation_level = isolation_level |
|
96 self.keep_alive = keep_alive |
94 self.init_statement = init_statement |
97 self.init_statement = init_statement |
95 self.keep_open = keep_open |
98 self.keep_open = keep_open |
96 |
99 |
97 |
100 |
98 class RowDict(dict): |
101 class RowDict(dict): |
159 |
162 |
160 def keep_alive(self): |
163 def keep_alive(self): |
161 '''Set socket to keepalive mode. Must be called before any query.''' |
164 '''Set socket to keepalive mode. Must be called before any query.''' |
162 sock = socket.fromfd(self.fileno(), socket.AF_INET, socket.SOCK_STREAM) |
165 sock = socket.fromfd(self.fileno(), socket.AF_INET, socket.SOCK_STREAM) |
163 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) |
166 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) |
164 # Maximum keep-alive probes before asuming the connection is lost |
167 try: |
165 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5) |
168 # Maximum keep-alive probes before asuming the connection is lost |
166 # Interval (in seconds) between keep-alive probes |
169 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5) |
167 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 2) |
170 # Interval (in seconds) between keep-alive probes |
168 # Maximum idle time (in seconds) before start sending keep-alive probes |
171 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 2) |
169 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10) |
172 # Maximum idle time (in seconds) before start sending keep-alive probes |
|
173 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10) |
|
174 except socket.error: |
|
175 pass |
170 |
176 |
171 |
177 |
172 class PgManager: |
178 class PgManager: |
173 |
179 |
174 def __init__(self): |
180 def __init__(self): |
178 |
184 |
179 def __del__(self): |
185 def __del__(self): |
180 for conn in tuple(self.conn_known.keys()): |
186 for conn in tuple(self.conn_known.keys()): |
181 self.destroy_conn(conn) |
187 self.destroy_conn(conn) |
182 |
188 |
183 def create_conn(self, name='default', isolation_level=None, dsn=None, **kw): |
189 def create_conn(self, name='default', isolation_level=None, keep_alive=True, dsn=None, **kw): |
184 '''Create named connection.''' |
190 '''Create named connection. |
|
191 |
|
192 name -- name for connection (default is "default") |
|
193 isolation_level -- "autocommit", "read_committed", "serializable" or None for driver default |
|
194 keep_alive -- set socket to keepalive mode |
|
195 dsn -- string with connection parameters (dsn means Data Source Name) |
|
196 |
|
197 Alternative for dsn is keyword args (same names as in dsn). |
|
198 |
|
199 ''' |
185 if name in self.conn_known: |
200 if name in self.conn_known: |
186 raise PgManagerError('Connection name "%s" already registered.' % name) |
201 raise PgManagerError('Connection name "%s" already registered.' % name) |
187 |
202 |
188 if dsn is None: |
203 if dsn is None: |
189 dsn = ' '.join([x[0]+'='+str(x[1]) for x in kw.items()]) |
204 dsn = ' '.join([x[0]+'='+str(x[1]) for x in kw.items() if x[1] is not None]) |
190 |
205 |
191 isolation_level = self._normalize_isolation_level(isolation_level) |
206 isolation_level = self._normalize_isolation_level(isolation_level) |
192 ci = ConnectionInfo(dsn, isolation_level) |
207 ci = ConnectionInfo(dsn, isolation_level, keep_alive) |
193 |
208 |
194 self.conn_known[name] = ci |
209 self.conn_known[name] = ci |
195 self.conn_pool[name] = [] |
210 self.conn_pool[name] = [] |
196 |
211 |
197 def close_conn(self, name='default'): |
212 def close_conn(self, name='default'): |
313 conn.notifies.pop() |
328 conn.notifies.pop() |
314 self.put_conn(conn, name) |
329 self.put_conn(conn, name) |
315 |
330 |
316 def _connect(self, ci): |
331 def _connect(self, ci): |
317 conn = psycopg2.connect(ci.dsn, connection_factory=Connection) |
332 conn = psycopg2.connect(ci.dsn, connection_factory=Connection) |
318 conn.keep_alive() |
333 if ci.keep_alive: |
|
334 conn.keep_alive() |
319 if not ci.isolation_level is None: |
335 if not ci.isolation_level is None: |
320 conn.set_isolation_level(ci.isolation_level) |
336 conn.set_isolation_level(ci.isolation_level) |
321 if ci.init_statement: |
337 if ci.init_statement: |
322 curs = conn.cursor() |
338 curs = conn.cursor() |
323 curs.execute(ci.init_statement) |
339 curs.execute(ci.init_statement) |