MyManager: Add Cursor.mogrify(). Fix query logging. Update tests.
authorRadek Brich <radek.brich@devl.cz>
Thu, 07 Mar 2013 18:26:52 +0100
changeset 75 39f777341db4
parent 74 d4306261ddfb
child 76 3a41b351b122
MyManager: Add Cursor.mogrify(). Fix query logging. Update tests.
TESTING
mytoolkit/mymanager.py
pgtoolkit/pgmanager.py
sql/tests.sql
tests.py
tests/test_mymanager.py
--- a/TESTING	Thu Mar 07 15:42:47 2013 +0100
+++ b/TESTING	Thu Mar 07 18:26:52 2013 +0100
@@ -13,3 +13,31 @@
 
 3. run tests.py
 
+
+PostgreSQL
+----------
+
+CREATE ROLE test LOGIN PASSWORD 'test';
+
+CREATE DATABASE test;
+\c test
+\i sql/tests.sql
+
+
+MySQL
+-----
+
+CREATE USER 'test'@'%' IDENTIFIED BY 'test';
+GRANT all ON test.* TO 'test'@'%';
+
+CREATE DATABASE test;
+USE test;
+
+CREATE TABLE test
+(
+  id INT NOT NULL AUTO_INCREMENT,
+  name TEXT,
+  PRIMARY KEY (id)
+)
+ENGINE=InnoDB CHARSET=utf8;
+
--- a/mytoolkit/mymanager.py	Thu Mar 07 15:42:47 2013 +0100
+++ b/mytoolkit/mymanager.py	Thu Mar 07 18:26:52 2013 +0100
@@ -73,6 +73,7 @@
 
 
 log_sql = logging.getLogger("mymanager_sql")
+log_sql.addHandler(logging.NullHandler())
 
 
 class MyManagerError(Exception):
@@ -108,13 +109,13 @@
         try:
             return super(Cursor, self).execute(query, args)
         finally:
-            log_sql.debug(self._executed.decode('utf8'))
+            self._log_query(query, args)
 
     def callproc(self, procname, args=None):
         try:
             return super(Cursor, self).callproc(procname, args)
         finally:
-            log_sql.debug(self._executed.decode('utf8'))
+            self._log_query(query, args)
 
     def row_dict(self, row, lstrip=None):
         adjustname = lambda a: a
@@ -132,6 +133,24 @@
         rows = super(Cursor, self).fetchall()
         return [self.row_dict(row, lstrip) for row in rows]
 
+    def mogrify(self, query, args):
+        """Get query with substituted args as it will be send to server."""
+        if isinstance(query, bytes):
+            query = query.decode()
+        if args is not None:
+            db = self._get_db()
+            query = query % db.literal(args)
+        return query
+
+    def _log_query(self, query, args):
+        name = self.connection.name if hasattr(self.connection, 'name') else '-'
+        query = self._executed or self.mogrify(query, args)
+        if isinstance(query, bytes):
+            db = self._get_db()
+            charset = db.character_set_name()
+            query = query.decode(charset)
+        log_sql.info('[%s] %s' % (name, query))
+
 
 class MyManager:
 
--- a/pgtoolkit/pgmanager.py	Thu Mar 07 15:42:47 2013 +0100
+++ b/pgtoolkit/pgmanager.py	Thu Mar 07 18:26:52 2013 +0100
@@ -83,6 +83,8 @@
 
 log_sql = logging.getLogger("pgmanager_sql")
 log_notices = logging.getLogger("pgmanager_notices")
+log_sql.addHandler(logging.NullHandler())
+# NullHandler not needed for notices which are INFO level only
 
 
 class PgManagerError(Exception):
--- a/sql/tests.sql	Thu Mar 07 15:42:47 2013 +0100
+++ b/sql/tests.sql	Thu Mar 07 18:26:52 2013 +0100
@@ -4,3 +4,5 @@
   name varchar,
   PRIMARY KEY (id)
 );
+
+GRANT ALL ON TABLE test TO test;
--- a/tests.py	Thu Mar 07 15:42:47 2013 +0100
+++ b/tests.py	Thu Mar 07 18:26:52 2013 +0100
@@ -9,7 +9,8 @@
 enable_mysql = True
 try:
     from tests import test_mymanager
-except ImportError:
+except ImportError as e:
+    print('Disabling MySQL tests due to import error:\n ', str(e))
     enable_mysql = False
 
 loader = unittest.TestLoader()
--- a/tests/test_mymanager.py	Thu Mar 07 15:42:47 2013 +0100
+++ b/tests/test_mymanager.py	Thu Mar 07 18:26:52 2013 +0100
@@ -4,6 +4,7 @@
 from mytoolkit import mymanager
 
 import unittest
+import logging
 
 
 class TestMyManager(unittest.TestCase):
@@ -14,10 +15,18 @@
         params = self.params_to_mapping(test_db_conn_params)
         self.m = mymanager.get_instance()
         self.m.create_conn(**params)
+        #self.setup_logging()
 
     def tearDown(self):
         self.m.destroy_conn()
 
+    def setup_logging(self):
+        #FIXME: write test handler, check SQL log
+        log = logging.getLogger('mymanager_sql')
+        handler = logging.StreamHandler()
+        log.addHandler(handler)
+        log.setLevel(logging.DEBUG)
+
     def params_to_mapping(self, params):
         return dict([param.split('=') for param in params.split(' ')])
 
@@ -28,6 +37,11 @@
             row = curs.fetchone_dict()
             self.assertEqual(row.ajaj, ajaj)
 
+    def test_mysql_error(self):
+        with self.m.cursor() as curs:
+            self.assertRaises(mymanager.OperationalError,
+                curs.execute, 'SELECT give_me_some_error;')
+
 
 if __name__ == '__main__':
     unittest.main()