# HG changeset patch # User Radek Brich # Date 1362677212 -3600 # Node ID 39f777341db41eb8e02a4ac2d32ec1f1d3886b93 # Parent d4306261ddfb2ced492884a86aaaae80d149401e MyManager: Add Cursor.mogrify(). Fix query logging. Update tests. diff -r d4306261ddfb -r 39f777341db4 TESTING --- a/TESTING Thu Mar 07 15:42:47 2013 +0100 +++ b/TESTING Thu Mar 07 18:26:52 2013 +0100 @@ -13,3 +13,31 @@ 3. run tests.py + +PostgreSQL +---------- + +CREATE ROLE test LOGIN PASSWORD 'test'; + +CREATE DATABASE test; +\c test +\i sql/tests.sql + + +MySQL +----- + +CREATE USER 'test'@'%' IDENTIFIED BY 'test'; +GRANT all ON test.* TO 'test'@'%'; + +CREATE DATABASE test; +USE test; + +CREATE TABLE test +( + id INT NOT NULL AUTO_INCREMENT, + name TEXT, + PRIMARY KEY (id) +) +ENGINE=InnoDB CHARSET=utf8; + diff -r d4306261ddfb -r 39f777341db4 mytoolkit/mymanager.py --- a/mytoolkit/mymanager.py Thu Mar 07 15:42:47 2013 +0100 +++ b/mytoolkit/mymanager.py Thu Mar 07 18:26:52 2013 +0100 @@ -73,6 +73,7 @@ log_sql = logging.getLogger("mymanager_sql") +log_sql.addHandler(logging.NullHandler()) class MyManagerError(Exception): @@ -108,13 +109,13 @@ try: return super(Cursor, self).execute(query, args) finally: - log_sql.debug(self._executed.decode('utf8')) + self._log_query(query, args) def callproc(self, procname, args=None): try: return super(Cursor, self).callproc(procname, args) finally: - log_sql.debug(self._executed.decode('utf8')) + self._log_query(query, args) def row_dict(self, row, lstrip=None): adjustname = lambda a: a @@ -132,6 +133,24 @@ rows = super(Cursor, self).fetchall() return [self.row_dict(row, lstrip) for row in rows] + def mogrify(self, query, args): + """Get query with substituted args as it will be send to server.""" + if isinstance(query, bytes): + query = query.decode() + if args is not None: + db = self._get_db() + query = query % db.literal(args) + return query + + def _log_query(self, query, args): + name = self.connection.name if hasattr(self.connection, 'name') else '-' + query = self._executed or self.mogrify(query, args) + if isinstance(query, bytes): + db = self._get_db() + charset = db.character_set_name() + query = query.decode(charset) + log_sql.info('[%s] %s' % (name, query)) + class MyManager: diff -r d4306261ddfb -r 39f777341db4 pgtoolkit/pgmanager.py --- a/pgtoolkit/pgmanager.py Thu Mar 07 15:42:47 2013 +0100 +++ b/pgtoolkit/pgmanager.py Thu Mar 07 18:26:52 2013 +0100 @@ -83,6 +83,8 @@ log_sql = logging.getLogger("pgmanager_sql") log_notices = logging.getLogger("pgmanager_notices") +log_sql.addHandler(logging.NullHandler()) +# NullHandler not needed for notices which are INFO level only class PgManagerError(Exception): diff -r d4306261ddfb -r 39f777341db4 sql/tests.sql --- a/sql/tests.sql Thu Mar 07 15:42:47 2013 +0100 +++ b/sql/tests.sql Thu Mar 07 18:26:52 2013 +0100 @@ -4,3 +4,5 @@ name varchar, PRIMARY KEY (id) ); + +GRANT ALL ON TABLE test TO test; diff -r d4306261ddfb -r 39f777341db4 tests.py --- a/tests.py Thu Mar 07 15:42:47 2013 +0100 +++ b/tests.py Thu Mar 07 18:26:52 2013 +0100 @@ -9,7 +9,8 @@ enable_mysql = True try: from tests import test_mymanager -except ImportError: +except ImportError as e: + print('Disabling MySQL tests due to import error:\n ', str(e)) enable_mysql = False loader = unittest.TestLoader() diff -r d4306261ddfb -r 39f777341db4 tests/test_mymanager.py --- a/tests/test_mymanager.py Thu Mar 07 15:42:47 2013 +0100 +++ b/tests/test_mymanager.py Thu Mar 07 18:26:52 2013 +0100 @@ -4,6 +4,7 @@ from mytoolkit import mymanager import unittest +import logging class TestMyManager(unittest.TestCase): @@ -14,10 +15,18 @@ params = self.params_to_mapping(test_db_conn_params) self.m = mymanager.get_instance() self.m.create_conn(**params) + #self.setup_logging() def tearDown(self): self.m.destroy_conn() + def setup_logging(self): + #FIXME: write test handler, check SQL log + log = logging.getLogger('mymanager_sql') + handler = logging.StreamHandler() + log.addHandler(handler) + log.setLevel(logging.DEBUG) + def params_to_mapping(self, params): return dict([param.split('=') for param in params.split(' ')]) @@ -28,6 +37,11 @@ row = curs.fetchone_dict() self.assertEqual(row.ajaj, ajaj) + def test_mysql_error(self): + with self.m.cursor() as curs: + self.assertRaises(mymanager.OperationalError, + curs.execute, 'SELECT give_me_some_error;') + if __name__ == '__main__': unittest.main()