pgtoolkit/pgmanager.py
changeset 19 e526ca146fa9
parent 9 2fcc8ef0b97d
child 20 73f0d53fef6b
equal deleted inserted replaced
18:a9e12b7cc207 19:e526ca146fa9
    76 import socket
    76 import socket
    77 
    77 
    78 import psycopg2
    78 import psycopg2
    79 import psycopg2.extensions
    79 import psycopg2.extensions
    80 
    80 
    81 from psycopg2 import DatabaseError, IntegrityError
    81 from psycopg2 import DatabaseError, IntegrityError, OperationalError
    82 
    82 
    83 
    83 
    84 class PgManagerError(Exception):
    84 class PgManagerError(Exception):
    85 
    85 
    86     pass
    86     pass
    87 
    87 
    88 
    88 
    89 class ConnectionInfo:
    89 class ConnectionInfo:
    90 
    90 
    91     def __init__(self, dsn, isolation_level=None, init_statement=None, keep_open=1):
    91     def __init__(self, dsn, isolation_level=None, keep_alive=True,
       
    92         init_statement=None, keep_open=1):
       
    93         
    92         self.dsn = dsn
    94         self.dsn = dsn
    93         self.isolation_level = isolation_level
    95         self.isolation_level = isolation_level
       
    96         self.keep_alive = keep_alive
    94         self.init_statement = init_statement
    97         self.init_statement = init_statement
    95         self.keep_open = keep_open
    98         self.keep_open = keep_open
    96 
    99 
    97 
   100 
    98 class RowDict(dict):
   101 class RowDict(dict):
   159 
   162 
   160     def keep_alive(self):
   163     def keep_alive(self):
   161         '''Set socket to keepalive mode. Must be called before any query.'''
   164         '''Set socket to keepalive mode. Must be called before any query.'''
   162         sock = socket.fromfd(self.fileno(), socket.AF_INET, socket.SOCK_STREAM) 
   165         sock = socket.fromfd(self.fileno(), socket.AF_INET, socket.SOCK_STREAM) 
   163         sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
   166         sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
   164         # Maximum keep-alive probes before asuming the connection is lost
   167         try:
   165         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
   168             # Maximum keep-alive probes before asuming the connection is lost
   166         # Interval (in seconds) between keep-alive probes
   169             sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
   167         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 2)
   170             # Interval (in seconds) between keep-alive probes
   168         # Maximum idle time (in seconds) before start sending keep-alive probes
   171             sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 2)
   169         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10)
   172             # Maximum idle time (in seconds) before start sending keep-alive probes
       
   173             sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10)
       
   174         except socket.error:
       
   175             pass
   170 
   176 
   171 
   177 
   172 class PgManager:
   178 class PgManager:
   173 
   179 
   174     def __init__(self):
   180     def __init__(self):
   178 
   184 
   179     def __del__(self):
   185     def __del__(self):
   180         for conn in tuple(self.conn_known.keys()):
   186         for conn in tuple(self.conn_known.keys()):
   181             self.destroy_conn(conn)
   187             self.destroy_conn(conn)
   182 
   188 
   183     def create_conn(self, name='default', isolation_level=None, dsn=None, **kw):
   189     def create_conn(self, name='default', isolation_level=None, keep_alive=True, dsn=None, **kw):
   184         '''Create named connection.'''
   190         '''Create named connection.
       
   191         
       
   192         name -- name for connection (default is "default")
       
   193         isolation_level -- "autocommit", "read_committed", "serializable" or None for driver default
       
   194         keep_alive -- set socket to keepalive mode
       
   195         dsn -- string with connection parameters (dsn means Data Source Name)
       
   196         
       
   197         Alternative for dsn is keyword args (same names as in dsn).
       
   198         
       
   199         '''
   185         if name in self.conn_known:
   200         if name in self.conn_known:
   186             raise PgManagerError('Connection name "%s" already registered.' % name)
   201             raise PgManagerError('Connection name "%s" already registered.' % name)
   187 
   202 
   188         if dsn is None:
   203         if dsn is None:
   189             dsn = ' '.join([x[0]+'='+str(x[1]) for x in kw.items()])
   204             dsn = ' '.join([x[0]+'='+str(x[1]) for x in kw.items() if x[1] is not None])
   190 
   205 
   191         isolation_level = self._normalize_isolation_level(isolation_level)
   206         isolation_level = self._normalize_isolation_level(isolation_level)
   192         ci = ConnectionInfo(dsn, isolation_level)
   207         ci = ConnectionInfo(dsn, isolation_level, keep_alive)
   193 
   208 
   194         self.conn_known[name] = ci
   209         self.conn_known[name] = ci
   195         self.conn_pool[name] = []
   210         self.conn_pool[name] = []
   196 
   211 
   197     def close_conn(self, name='default'):
   212     def close_conn(self, name='default'):
   313                 conn.notifies.pop()
   328                 conn.notifies.pop()
   314             self.put_conn(conn, name)
   329             self.put_conn(conn, name)
   315 
   330 
   316     def _connect(self, ci):
   331     def _connect(self, ci):
   317         conn = psycopg2.connect(ci.dsn, connection_factory=Connection)
   332         conn = psycopg2.connect(ci.dsn, connection_factory=Connection)
   318         conn.keep_alive()
   333         if ci.keep_alive:
       
   334             conn.keep_alive()
   319         if not ci.isolation_level is None:
   335         if not ci.isolation_level is None:
   320             conn.set_isolation_level(ci.isolation_level)
   336             conn.set_isolation_level(ci.isolation_level)
   321         if ci.init_statement:
   337         if ci.init_statement:
   322             curs = conn.cursor()
   338             curs = conn.cursor()
   323             curs.execute(ci.init_statement)
   339             curs.execute(ci.init_statement)