pgtoolkit/pgmanager.py
changeset 43 a921669e913a
parent 42 9e3775460792
child 45 6d8e4ee4bdd2
equal deleted inserted replaced
42:9e3775460792 43:a921669e913a
   103         self.init_statement = init_statement
   103         self.init_statement = init_statement
   104         self.pool_size = pool_size
   104         self.pool_size = pool_size
   105 
   105 
   106 
   106 
   107 class RowDict(OrderedDict):
   107 class RowDict(OrderedDict):
       
   108     """Special read-only dictionary used for rows returned from queries.
       
   109 
       
   110     Initialization is same as for dict:
       
   111         row = RowDict([('id', 123), ('name', 'hello')])
       
   112 
       
   113     Allows key and attribute access to contained items:
       
   114         row['id']
       
   115         row.id
       
   116 
       
   117     Items keep order in which columns where returned from database.
       
   118 
       
   119     Tuple style access is also supported:
       
   120         row[0]
       
   121         id, name = row
       
   122 
       
   123     """
       
   124 
       
   125     def __init__(self, data):
       
   126         self._dict = OrderedDict(data)
       
   127 
       
   128     def __getitem__(self, key):
       
   129         if isinstance(key, int):
       
   130             return tuple(self._dict.values())[key]
       
   131         return self._dict[key]
   108 
   132 
   109     def __getattr__(self, key):
   133     def __getattr__(self, key):
   110         try:
   134         try:
   111             return self[key]
   135             return self._dict[key]
   112         except KeyError:
   136         except KeyError:
   113             raise AttributeError(key)
   137             raise AttributeError(key)
       
   138 
       
   139     def __contains__(self, key):
       
   140         return key in self._dict
   114 
   141 
   115 
   142 
   116 class Cursor(psycopg2.extensions.cursor):
   143 class Cursor(psycopg2.extensions.cursor):
   117 
   144 
   118     def execute(self, query, args=None):
   145     def execute(self, query, args=None):
   158             adapted = [self.mogrify('%s', [x]).decode('utf8') for x in row]
   185             adapted = [self.mogrify('%s', [x]).decode('utf8') for x in row]
   159         return adapted
   186         return adapted
   160 
   187 
   161     def fetchone_adapted(self, lstrip=None):
   188     def fetchone_adapted(self, lstrip=None):
   162         '''Like fetchone_dict() but values are quoted for direct inclusion in SQL query.
   189         '''Like fetchone_dict() but values are quoted for direct inclusion in SQL query.
   163         
   190 
   164         This is useful when you need to generate SQL script from data returned
   191         This is useful when you need to generate SQL script from data returned
   165         by the query. Use mogrify() for simple cases.
   192         by the query. Use mogrify() for simple cases.
   166         
   193 
   167         '''
   194         '''
   168         row = super(Cursor, self).fetchone()
   195         row = super(Cursor, self).fetchone()
   169         if row is None:
   196         if row is None:
   170             return None
   197             return None
   171         return self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip)
   198         return self.row_dict([self.mogrify('%s', [x]).decode('utf8') for x in row], lstrip)
   221             self.destroy_conn(conn)
   248             self.destroy_conn(conn)
   222 
   249 
   223     def create_conn(self, name='default', isolation_level=None, keep_alive=True, init_statement=None,
   250     def create_conn(self, name='default', isolation_level=None, keep_alive=True, init_statement=None,
   224                     pool_size=1, dsn=None, **kwargs):
   251                     pool_size=1, dsn=None, **kwargs):
   225         '''Create named connection.
   252         '''Create named connection.
   226         
   253 
   227         name -- name for connection (default is "default")
   254         name -- name for connection (default is "default")
   228         pool_size -- how many connections will be kept open in pool
   255         pool_size -- how many connections will be kept open in pool
   229                      (more connections will still be created but they will be closed by put_conn)
   256                      (more connections will still be created but they will be closed by put_conn)
   230                      None - disable pool, always return same connection
   257                      None - disable pool, always return same connection
   231         isolation_level -- "autocommit", "read_committed", "serializable" or None for driver default
   258         isolation_level -- "autocommit", "read_committed", "serializable" or None for driver default
   232         keep_alive -- set socket to keepalive mode
   259         keep_alive -- set socket to keepalive mode
   233         dsn -- connection string (parameters or data source name)
   260         dsn -- connection string (parameters or data source name)
   234         
   261 
   235         Other keyword args are used as connection parameters.
   262         Other keyword args are used as connection parameters.
   236         
   263 
   237         '''
   264         '''
   238         if name in self.conn_known:
   265         if name in self.conn_known:
   239             raise PgManagerError('Connection name "%s" already registered.' % name)
   266             raise PgManagerError('Connection name "%s" already registered.' % name)
   240 
   267 
   241         if dsn is None:
   268         if dsn is None:
   244         isolation_level = self._normalize_isolation_level(isolation_level)
   271         isolation_level = self._normalize_isolation_level(isolation_level)
   245         ci = ConnectionInfo(name, dsn, isolation_level, keep_alive, init_statement, pool_size)
   272         ci = ConnectionInfo(name, dsn, isolation_level, keep_alive, init_statement, pool_size)
   246 
   273 
   247         self.conn_known[name] = ci
   274         self.conn_known[name] = ci
   248         self.conn_pool[name] = []
   275         self.conn_pool[name] = []
   249     
   276 
   250     def create_conn_listen(self, name, channel, dsn=None, copy_dsn=None, **kwargs):
   277     def create_conn_listen(self, name, channel, dsn=None, copy_dsn=None, **kwargs):
   251         '''Create connection listening for notifies.
   278         '''Create connection listening for notifies.
   252         
   279 
   253         Disables pool. If you want to use pool, create other connection for that.
   280         Disables pool. If you want to use pool, create other connection for that.
   254         This connection can be used as usual: conn.cursor() etc.
   281         This connection can be used as usual: conn.cursor() etc.
   255         Don't use PgManager's cursor() and put_conn(). 
   282         Don't use PgManager's cursor() and put_conn().
   256         
   283 
   257         name -- name for connection
   284         name -- name for connection
   258         channel -- listen on this channel
   285         channel -- listen on this channel
   259         copy_dsn -- specify name of other connection and its dsn will be used
   286         copy_dsn -- specify name of other connection and its dsn will be used
   260         
   287 
   261         Other parameters forwarded to create_conn().
   288         Other parameters forwarded to create_conn().
   262         
   289 
   263         '''
   290         '''
   264         if dsn is None and copy_dsn:
   291         if dsn is None and copy_dsn:
   265             try:
   292             try:
   266                 dsn = self.conn_known[copy_dsn].dsn
   293                 dsn = self.conn_known[copy_dsn].dsn
   267             except KeyError:
   294             except KeyError:
   270         self.create_conn(name=name, pool_size=None, isolation_level='autocommit', init_statement=listen_query,
   297         self.create_conn(name=name, pool_size=None, isolation_level='autocommit', init_statement=listen_query,
   271             dsn=dsn, **kwargs)
   298             dsn=dsn, **kwargs)
   272 
   299 
   273     def close_conn(self, name='default'):
   300     def close_conn(self, name='default'):
   274         '''Close all connections of given name.
   301         '''Close all connections of given name.
   275         
   302 
   276         Connection credentials are still saved.
   303         Connection credentials are still saved.
   277         
   304 
   278         '''
   305         '''
   279         while len(self.conn_pool[name]):
   306         while len(self.conn_pool[name]):
   280             conn = self.conn_pool[name].pop()
   307             conn = self.conn_pool[name].pop()
   281             conn.close()
   308             conn.close()
   282 
   309 
   283     def destroy_conn(self, name='default'):
   310     def destroy_conn(self, name='default'):
   284         '''Destroy connection.
   311         '''Destroy connection.
   285         
   312 
   286         Counterpart of create_conn.
   313         Counterpart of create_conn.
   287         
   314 
   288         '''
   315         '''
   289         if not name in self.conn_known:
   316         if not name in self.conn_known:
   290             raise PgManagerError('Connection name "%s" not registered.' % name)
   317             raise PgManagerError('Connection name "%s" not registered.' % name)
   291 
   318 
   292         self.close_conn(name)
   319         self.close_conn(name)
   301         try:
   328         try:
   302             try:
   329             try:
   303                 ci = self.conn_known[name]
   330                 ci = self.conn_known[name]
   304             except KeyError:
   331             except KeyError:
   305                 raise PgManagerError("Connection name '%s' not registered." % name)
   332                 raise PgManagerError("Connection name '%s' not registered." % name)
   306             
   333 
   307             # no pool, just one static connection
   334             # no pool, just one static connection
   308             if ci.pool_size is None:
   335             if ci.pool_size is None:
   309                 # check for existing connection
   336                 # check for existing connection
   310                 try:
   337                 try:
   311                     conn = self.conn_pool[name][0]
   338                     conn = self.conn_pool[name][0]
   316                     self.conn_pool[name].append(conn)
   343                     self.conn_pool[name].append(conn)
   317                 # if no existing connection is valid, connect new one and save it
   344                 # if no existing connection is valid, connect new one and save it
   318                 if conn is None:
   345                 if conn is None:
   319                     conn = self._connect(ci)
   346                     conn = self._connect(ci)
   320                     self.conn_pool[name][0] = conn
   347                     self.conn_pool[name][0] = conn
   321             
   348 
   322             # connection from pool
   349             # connection from pool
   323             else:
   350             else:
   324                 conn = None
   351                 conn = None
   325                 while len(self.conn_pool[name]) and conn is None:
   352                 while len(self.conn_pool[name]) and conn is None:
   326                     conn = self.conn_pool[name].pop()
   353                     conn = self.conn_pool[name].pop()
   327                     if conn.closed:
   354                     if conn.closed:
   328                         conn = None
   355                         conn = None
   329                 
   356 
   330                 if conn is None:
   357                 if conn is None:
   331                     conn = self._connect(ci)
   358                     conn = self._connect(ci)
   332         finally:
   359         finally:
   333             self.lock.release()
   360             self.lock.release()
   334         return conn
   361         return conn
   335 
   362 
   336     def put_conn(self, conn, name='default'):
   363     def put_conn(self, conn, name='default'):
   337         '''Put connection back to pool.
   364         '''Put connection back to pool.
   338         
   365 
   339         Name must be same as used for get_conn,
   366         Name must be same as used for get_conn,
   340         otherwise things become broken.
   367         otherwise things become broken.
   341         
   368 
   342         '''
   369         '''
   343         self.lock.acquire()
   370         self.lock.acquire()
   344         try:
   371         try:
   345             if not name in self.conn_known:
   372             if not name in self.conn_known:
   346                 raise PgManagerError("Connection name '%s' not registered." % name)
   373                 raise PgManagerError("Connection name '%s' not registered." % name)
   367             self.lock.release()
   394             self.lock.release()
   368 
   395 
   369     @contextmanager
   396     @contextmanager
   370     def cursor(self, name='default'):
   397     def cursor(self, name='default'):
   371         '''Cursor context.
   398         '''Cursor context.
   372         
   399 
   373         Uses any connection of name 'name' from pool
   400         Uses any connection of name 'name' from pool
   374         and returns cursor for that connection.
   401         and returns cursor for that connection.
   375         
   402 
   376         '''
   403         '''
   377         conn = self.get_conn(name)
   404         conn = self.get_conn(name)
   378 
   405 
   379         try:
   406         try:
   380             curs = conn.cursor()
   407             curs = conn.cursor()
   389             log_notices.info(notice.rstrip())
   416             log_notices.info(notice.rstrip())
   390         conn.notices[:] = []
   417         conn.notices[:] = []
   391 
   418 
   392     def wait_for_notify(self, name='default', timeout=5.0):
   419     def wait_for_notify(self, name='default', timeout=5.0):
   393         '''Wait for asynchronous notifies, return the last one.
   420         '''Wait for asynchronous notifies, return the last one.
   394         
   421 
   395         name -- name of connection, must be created using create_conn_listen()
   422         name -- name of connection, must be created using create_conn_listen()
   396         timeout -- in seconds, floating point (None - wait forever)
   423         timeout -- in seconds, floating point (None - wait forever)
   397         
   424 
   398         Returns None on timeout.
   425         Returns None on timeout.
   399         
   426 
   400         '''
   427         '''
   401         conn = self.get_conn(name)
   428         conn = self.get_conn(name)
   402         
   429 
   403         # return any notifies on stack
   430         # return any notifies on stack
   404         if conn.notifies:
   431         if conn.notifies:
   405             return conn.notifies.pop()
   432             return conn.notifies.pop()
   406 
   433 
   407         if select.select([conn], [], [], timeout) == ([], [], []):
   434         if select.select([conn], [], [], timeout) == ([], [], []):
   436                 return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED
   463                 return psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED
   437             if level.lower() == 'serializable':
   464             if level.lower() == 'serializable':
   438                 return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
   465                 return psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
   439             raise PgManagerError('Unknown isolation level name: "%s"', level)
   466             raise PgManagerError('Unknown isolation level name: "%s"', level)
   440         return level
   467         return level
   441     
   468 
   442     def _check_fork(self):
   469     def _check_fork(self):
   443         '''Check if process was forked (PID has changed).
   470         '''Check if process was forked (PID has changed).
   444         
   471 
   445         If it was, clean parent's connections.
   472         If it was, clean parent's connections.
   446         New connections are created for children.
   473         New connections are created for children.
   447         Known connection credentials are inherited, but not shared.
   474         Known connection credentials are inherited, but not shared.
   448         
   475 
   449         '''
   476         '''
   450         if self.pid == multiprocessing.current_process().pid:
   477         if self.pid == multiprocessing.current_process().pid:
   451             # PID has not changed
   478             # PID has not changed
   452             return
   479             return
   453         
   480 
   454         # update saved PID
   481         # update saved PID
   455         self.pid = multiprocessing.current_process().pid
   482         self.pid = multiprocessing.current_process().pid
   456         # reinitialize lock
   483         # reinitialize lock
   457         self.lock = threading.Lock()
   484         self.lock = threading.Lock()
   458         # clean parent's connections
   485         # clean parent's connections