tools/pgmanager.py
changeset 8 2911935c524d
parent 7 685b20d2d3ab
equal deleted inserted replaced
7:685b20d2d3ab 8:2911935c524d
    68 
    68 
    69 from contextlib import contextmanager
    69 from contextlib import contextmanager
    70 import logging
    70 import logging
    71 import threading
    71 import threading
    72 import select
    72 import select
       
    73 import socket
    73 
    74 
    74 import psycopg2
    75 import psycopg2
    75 import psycopg2.extensions
    76 import psycopg2.extensions
    76 
    77 
    77 from psycopg2 import DatabaseError, IntegrityError
    78 from psycopg2 import DatabaseError, IntegrityError
   151         if name is None:
   152         if name is None:
   152             return super(Connection, self).cursor(cursor_factory=Cursor)
   153             return super(Connection, self).cursor(cursor_factory=Cursor)
   153         else:
   154         else:
   154             return super(Connection, self).cursor(name, cursor_factory=Cursor)
   155             return super(Connection, self).cursor(name, cursor_factory=Cursor)
   155 
   156 
       
   157     def keep_alive(self):
       
   158         '''Set socket to keepalive mode. Must be called before any query.'''
       
   159         sock = socket.fromfd(self.fileno(), socket.AF_INET, socket.SOCK_STREAM) 
       
   160         sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
       
   161         # Maximum keep-alive probes before asuming the connection is lost
       
   162         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
       
   163         # Interval (in seconds) between keep-alive probes
       
   164         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 2)
       
   165         # Maximum idle time (in seconds) before start sending keep-alive probes
       
   166         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10)
       
   167 
   156 
   168 
   157 class PgManager:
   169 class PgManager:
   158 
   170 
   159     def __init__(self):
   171     def __init__(self):
   160         self.conn_known = {}  # available connections
   172         self.conn_known = {}  # available connections
   216                 if conn.closed:
   228                 if conn.closed:
   217                     conn = None
   229                     conn = None
   218 
   230 
   219             if conn is None:
   231             if conn is None:
   220                 ci = self.conn_known[name]
   232                 ci = self.conn_known[name]
   221                 conn = psycopg2.connect(ci.dsn, connection_factory=Connection)
   233                 conn = self._connect(ci)
   222                 if not ci.isolation_level is None:
       
   223                     conn.set_isolation_level(ci.isolation_level)
       
   224                 if ci.init_statement:
       
   225                     curs = conn.cursor()
       
   226                     curs.execute(ci.init_statement)
       
   227                     curs.close()
       
   228         finally:
   234         finally:
   229             self.lock.release()
   235             self.lock.release()
   230         return conn
   236         return conn
   231 
   237 
   232     def put_conn(self, conn, name='default'):
   238     def put_conn(self, conn, name='default'):
   301         finally:
   307         finally:
   302             # clean notifies
   308             # clean notifies
   303             while conn.notifies:
   309             while conn.notifies:
   304                 conn.notifies.pop()
   310                 conn.notifies.pop()
   305             self.put_conn(conn, name)
   311             self.put_conn(conn, name)
       
   312 
       
   313     def _connect(self, ci):
       
   314         conn = psycopg2.connect(ci.dsn, connection_factory=Connection)
       
   315         conn.keep_alive()
       
   316         if not ci.isolation_level is None:
       
   317             conn.set_isolation_level(ci.isolation_level)
       
   318         if ci.init_statement:
       
   319             curs = conn.cursor()
       
   320             curs.execute(ci.init_statement)
       
   321             curs.close()
       
   322         return conn
   306 
   323 
   307     def _normalize_isolation_level(self, level):
   324     def _normalize_isolation_level(self, level):
   308         if type(level) == str:
   325         if type(level) == str:
   309             if level.lower() == 'autocommit':
   326             if level.lower() == 'autocommit':
   310                 return psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
   327                 return psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT