Changeset 4407

Show
Ignore:
Timestamp:
03/28/09 23:00:51 (4 years ago)
Author:
gpolo
Message:

New tests, new utility functions.

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • branch/merger/tests/test_sqlite_column_merge.py

    r4402 r4407  
    55from sqlitedb import sqlite, merge, _DictCursor 
    66 
     7def conn_cursor(name): 
     8    conn = sqlite.connect(name) 
     9    return conn, conn.cursor() 
     10 
     11def _table_name(): 
     12    """Generates an unique table name each time.""" 
     13    start = 'a' 
     14    while True: 
     15        yield start 
     16        new_letter = ord(start[-1]) + 1 
     17        if new_letter > 122: 
     18            start = 'a' * (len(start) + 1) 
     19        else: 
     20            start = start[:-1] + chr(new_letter) 
     21 
     22 
    723class ColumnMergeTest(unittest.TestCase): 
    824 
     
    1026        self.new_db_file = tempfile.NamedTemporaryFile() 
    1127        self.old_db_file = tempfile.NamedTemporaryFile() 
    12         self.new_db = sqlite.connect(self.new_db_file.name) 
    13         self.old_db = sqlite.connect(self.old_db_file.name) 
    14         self.new_cursor = self.new_db.cursor(_DictCursor) 
    15         self.old_cursor = self.old_db.cursor(_DictCursor) 
     28        self.new_db, self.new_cursor = conn_cursor(self.new_db_file.name) 
     29        self.old_db, self.old_cursor = conn_cursor(self.old_db_file.name) 
     30 
     31        self.table_name = _table_name() 
     32 
    1633 
    1734    def tearDown(self): 
     
    2441 
    2542 
    26     def test_column_merge(self): 
    27         self.new_cursor.execute("CREATE TABLE a (b INTEGER, c INTEGER)") 
    28         self.old_cursor.execute("CREATE TABLE a (b INTEGER)") 
     43    def _prepare_test(self, new, old): 
     44        table = self.table_name.next() 
     45        self.new_cursor.execute("CREATE TABLE %s (%s)" % (table, new)) 
     46        self.old_cursor.execute("CREATE TABLE %s (%s)" % (table, old)) 
     47        return table 
    2948 
    30         # I don't care if the pragma table_info still exists or not, if it 
    31         # doesn't then let the test fail. 
    32  
    33         old_info = self.old_cursor.execute("pragma table_info('a')").fetchall() 
    34         self.failUnlessEqual(len(old_info), 1) 
     49    def _verify_merge(self, new, old): 
     50        table = self._prepare_test(new, old) 
     51        new_table_info = self.new_cursor.execute( 
     52                "pragma table_info(%s)" % table).fetchall() 
    3553 
    3654        merge(self.new_db_file.name, self.old_db_file.name) 
    3755 
    38         old_info = self.old_cursor.execute("pragma table_info('a')").fetchall() 
    39         self.failUnlessEqual(len(old_info), 2) 
     56        # Need to reconnect to get the updated table information. 
     57        conn, cursor = conn_cursor(self.old_db_file.name) 
     58        # I don't care if the pragma table_info still exists or not, if it 
     59        # doesn't then let the test fail. 
     60        old_table_info = cursor.execute( 
     61                "pragma table_info(%s)" % table).fetchall() 
     62        self.failUnlessEqual(len(old_table_info), len(new_table_info)) 
     63        self.failUnlessEqual(old_table_info, new_table_info) 
     64        cursor.close() 
     65        conn.close() 
     66 
     67    def _verify_merge_fails(self, exc, table, new, old): 
     68        self._prepare_test(table, new, old) 
     69 
     70        self.failUnlessRaises(exc, 
     71                merge, self.new_db_file.name, self.old_db_file.name) 
     72 
     73 
     74    def test_column_merge(self): 
     75        # XXX not supported yet 
     76        #self.new_cursor.execute("CREATE TABLE a (b INTEGER, " 
     77        #        "c INTEGER CONSTRAINT fk REFERENCES d(id))") 
     78 
     79        self._verify_merge("a", 
     80                "b INTEGER, c INTEGER", 
     81                "b INTEGER") 
     82 
     83        self._verify_merge( 
     84                "b INTEGER, c INTEGER NOT NULL DEFAULT 3", 
     85                "b INTEGER") 
     86 
     87        self._verify_merge_fails(sqlite.OperationalError, 
     88                "b INTEGER, c INTEGER NOT NULL", 
     89                "b INTEGER") 
     90 
     91        self._verify_merge_fails(Exception, 
     92                "b INTEGER, c INTEGERY PRIMARY KEY", 
     93                "b INTEGER") 
    4094 
    4195