Changeset 4425

Show
Ignore:
Timestamp:
04/04/09 15:26:09 (4 years ago)
Author:
gpolo
Message:

Added function for extracting column-defs from create table statement.

Location:
branch/merger
Files:
2 modified

Legend:

Unmodified
Added
Removed
  • branch/merger/sqlitedb.py

    r4423 r4425  
    22 
    33import os 
     4import re 
    45import shutil 
    56import warnings 
     
    2425#   Index merging (read copying) 
    2526#   Proper foreign key merging 
     27VALID_NAME_PATTERN = ( 
     28        """ 
     29        ( 
     30            # If name starts with [ or ` or " then I assume anything 
     31            # can form the name. Note that is not really important to 
     32            # know if pairs of [ ] (or " ", ` `) matches, it is assumed 
     33            # that sqltable was validated by sqlite already. 
     34            ((\[ | ` | ") .+ (\] | ` | ")) | 
     35            # Otherwise just [a-zA-Z0-9_] (maybe more depending on the flags). 
     36            (\w+) 
     37        ) 
     38        """) 
     39VALID_NAME = re.compile(VALID_NAME_PATTERN, re.VERBOSE) 
     40 
     41def column_definitions(sqltable): 
     42    """Given a complete create-table-stmt defined in sqltable, return the 
     43    columns on it and their respectives column-defs.""" 
     44    sqltable = sqltable.strip() or '' 
     45 
     46    # Discarding all "CREATE [TEMP|TEMPORARY] TABLE [IF NOT EXISTS]" 
     47    discard = re.match( 
     48            """ 
     49            ^\s*CREATE\s* 
     50            (TEMP|TEMPORARY)?\s* 
     51            TABLE\s* 
     52            (IF\s*NOT\s*EXISTS\s*)?\s+ 
     53            """, sqltable, re.IGNORECASE | re.VERBOSE) 
     54    if discard is None: 
     55        raise Exception("Malformed create-table-stmt") 
     56 
     57    sqltable = sqltable[discard.end():] 
     58 
     59    # Discarding [dbname].tablename 
     60    discard = re.match( 
     61            """ 
     62            # optional dbname 
     63            ^(.+\.)? 
     64            # Looking for tablename now. 
     65            %s 
     66            # Spaces 
     67            \s* 
     68            """ % VALID_NAME_PATTERN, sqltable, re.VERBOSE) 
     69    if discard is None: 
     70        raise Exception("Table name not present") 
     71 
     72    if sqltable[:2].lower() == 'as': 
     73        raise Exception("Table will be formed by the result set of " 
     74                "a query, not supported.") 
     75 
     76    if sqltable[discard.end()] != '(' or sqltable[-1] != ')': 
     77        raise Exception("Malformed create-table-stmt, missing parentheses") 
     78 
     79    interesting = sqltable[discard.end()+1:-1].lstrip() 
     80 
     81    # Now we have column-def, column-def, .. [, table-constraint ..] and 
     82    # we want to split each column-def into 
     83    # (column-name, type-name [column-constraint]) 
     84    column_defs = {} 
     85 
     86    table_constraints = set([ 
     87        'constraint', 'primary', 'unique', 'check', 'foreign']) 
     88 
     89    while interesting: 
     90        column_name = VALID_NAME.match(interesting) 
     91        if column_name is None: 
     92            raise Exception("Missing column-name") 
     93        interesting = interesting[column_name.end():].lstrip() 
     94        column_name = column_name.group().lower() 
     95        if column_name in table_constraints: 
     96            # We are not interested in table-contrainst. 
     97            # XXX This can be used to indicate that this table can't be fully 
     98            # merged. 
     99            continue 
     100 
     101        # Now there is either a type-name, column-constraint, or nothing. 
     102 
     103        # We want to either find the next comma that separates this 
     104        # column-def to the next one, or the end of input meaning this 
     105        # was the last column-def. 
     106        in_parens = False 
     107        for indx, char in enumerate(interesting): 
     108            if char == ',' and not in_parens: 
     109                # Found the comma we were after. 
     110                column_defs[column_name] = interesting[:indx] 
     111                break 
     112            elif char == '(': 
     113                in_parens = True 
     114            elif char == ')': 
     115                in_parens = False 
     116        else: 
     117            # Comma not found, column-def finished. 
     118            column_defs[column_name] = interesting or None 
     119            break 
     120 
     121        interesting = interesting[indx + 1:].lstrip() 
     122 
     123    return column_defs 
     124 
    26125 
    27126class SqliteDBMerge(object): 
     
    200299            self._tocursor.execute(result['sql']) 
    201300 
     301 
    202302merge = SqliteDBMerge 
    203303 
  • branch/merger/tests/test_sqlitemerge.py

    r4423 r4425  
    33from test.test_support import run_unittest 
    44 
    5 from sqlitedb import sqlite, merge, _DictCursor 
     5from sqlitedb import sqlite, merge, column_definitions, _DictCursor 
    66 
    77def conn_cursor(name): 
     
    4040    def merge_now(self): 
    4141        merge(self.new_db_file.name, self.old_db_file.name) 
     42 
     43 
     44class ColumnDefinitionTest(unittest.TestCase): 
     45 
     46    def test_invalid(self): 
     47        self.failUnlessRaises(Exception, column_definitions, 'buh') 
     48        self.failUnlessRaises(Exception, column_definitions, None) 
     49 
     50    def test_simple_createtable(self): 
     51        ctable_stmt = "CREATE TABLE x (a UNIQUE)" 
     52        self.failUnlessEqual(column_definitions(ctable_stmt), 
     53                {'a': 'UNIQUE'}) 
     54        ctable_stmt = ( 
     55                "CREATE TABLE x (" 
     56                "   a UNIQUE CHECK (1, 2), " 
     57                "   b CONSTRAINT buh)") 
     58        self.failUnlessEqual(column_definitions(ctable_stmt), 
     59                {'a': 'UNIQUE CHECK (1, 2)', 'b': 'CONSTRAINT buh'}) 
    4260 
    4361 
     
    154172 
    155173if __name__ == "__main__": 
    156     run_unittest(ColumnMergeTest, TriggersTest, TableCopyTest) 
     174    run_unittest(ColumnDefinitionTest, ColumnMergeTest, TriggersTest, 
     175            TableCopyTest)