| 1 | import sys |
||
| 2 | import os |
||
| 3 | import argparse |
||
| 4 | import getpass |
||
| 5 | import logging |
||
| 6 | import sqlite3 |
||
| 7 | from pathlib import Path |
||
| 8 | from LocalDB_schema import * |
||
| 9 | |||
| 10 | logging.basicConfig(stream=sys.stdout, level=logging.INFO) |
||
| 11 | |||
| 12 | |||
| 13 | def check_value(database_path, table_name, ColumnName, ColumnValue): |
||
| 14 | ''' |
||
| 15 | Check if a subject exist in the given database and given table |
||
| 16 | :param database_path: path to the SQLite database |
||
| 17 | :param table_name: the name of the table being queried |
||
| 18 | :param ColumnName: the column being queried |
||
| 19 | :param ColumnValue: the value of the column being checked |
||
| 20 | :return: boolean on if this is ever found in the given database in the given table, in the given column. |
||
| 21 | ''' |
||
| 22 | logger = logging.getLogger('LORISQuery_CheckSubjectExist') |
||
| 23 | |||
| 24 | |||
| 25 | SQLPath = Path(database_path) |
||
| 26 | |||
| 27 | # check if path is a file and exist. |
||
| 28 | if not SQLPath.is_file(): |
||
| 29 | logger.info('SQLite database file does not exist!') |
||
| 30 | return False |
||
| 31 | |||
| 32 | # Try to connect the database to start the process: |
||
| 33 | try: |
||
| 34 | # Create on Connecting to the database file |
||
| 35 | ConnectedDatabase = sqlite3.connect(database_path) |
||
| 36 | c = ConnectedDatabase.cursor() |
||
| 37 | |||
| 38 | logger.info("Checking key value: " + str(ColumnValue) + " in " + ColumnName + " in SQLite database.") |
||
| 39 | |||
| 40 | # Creating a new SQLite table_name with DBKey column (inspired by: https://sebastianraschka.com/Articles/2014_sqlite_in_python_tutorial.html) |
||
| 41 | c.execute('SELECT * FROM {table_name} WHERE {columnname}="{columnvalue}"'.format(table_name=table_name, columnname=ColumnName, columnvalue=ColumnValue)) |
||
| 42 | |||
| 43 | result_rows = c.fetchall() |
||
| 44 | |||
| 45 | except: |
||
| 46 | raise IOError() |
||
| 47 | |||
| 48 | # Closing the connection to the database file |
||
| 49 | ConnectedDatabase.close() |
||
| 50 | |||
| 51 | if len(result_rows) > 0: |
||
| 52 | return True, result_rows |
||
| 53 | else: |
||
| 54 | return False, result_rows |
||
| 55 | |||
| 56 | |||
| 57 | def create_entry(database_path, table_name, key_field, key_field_value): |
||
| 58 | ''' |
||
| 59 | A general function to create entries into the database BY providing the name of the KEYValue field and KEYvalue value to be created |
||
| 60 | Note it MUST be the keyfield. |
||
| 61 | :param database_path: path to the database |
||
| 62 | :param table_name: name of the table |
||
| 63 | :param key_field: KeyFiled in the table to be created |
||
| 64 | :param key_field_value: value of the key_field to be created. |
||
| 65 | :return: if the entry has been successfully created. |
||
| 66 | ''' |
||
| 67 | logger = logging.getLogger('LORISQuery_CreateSubject') |
||
| 68 | |||
| 69 | # if SQL already exist, quit script. |
||
| 70 | SQLPath = Path(database_path) |
||
| 71 | |||
| 72 | # check if path is a file and exist. |
||
| 73 | if not SQLPath.is_file(): |
||
| 74 | logger.info('SQLite database file does not exist!') |
||
| 75 | return False |
||
| 76 | |||
| 77 | # Try to connect the database to start the process: |
||
| 78 | try: |
||
| 79 | # Create on Connecting to the database file |
||
| 80 | ConnectedDatabase = sqlite3.connect(database_path) |
||
| 81 | c = ConnectedDatabase.cursor() |
||
| 82 | |||
| 83 | logger.info('Creating new record in SQLite database.') |
||
| 84 | |||
| 85 | # Creating a new SQLite record row (inspired by: https://sebastianraschka.com/Articles/2014_sqlite_in_python_tutorial.html) |
||
| 86 | c.execute('INSERT OR IGNORE INTO {tn} ({field}) VALUES ("{value}")'.format(tn=table_name, field=key_field, value=key_field_value)) |
||
| 87 | except: |
||
| 88 | raise IOError() |
||
| 89 | |||
| 90 | # Closing the connection to the database file |
||
| 91 | ConnectedDatabase.commit() |
||
| 92 | ConnectedDatabase.close() |
||
| 93 | return True |
||
| 94 | |||
| 95 | |||
| 96 | def update_entry(database_path, table_name, key_field, key_field_value, field, field_value): |
||
| 97 | ''' |
||
| 98 | A general function to create entries into the database BY providing the name of the KEYValue field and KEYvalue value to be created |
||
| 99 | :param database_path: |
||
| 100 | :param table_name: |
||
| 101 | :param key_field: |
||
| 102 | :param key_field_value: |
||
| 103 | :param field: |
||
| 104 | :param field_value: |
||
| 105 | :return: |
||
| 106 | ''' |
||
| 107 | |||
| 108 | logger = logging.getLogger('LORISQuery_CreateSubject') |
||
| 109 | |||
| 110 | # if SQL already exist, quit script. |
||
| 111 | SQLPath = Path(database_path) |
||
| 112 | |||
| 113 | # check if path is a file and exist. |
||
| 114 | if not SQLPath.is_file(): |
||
| 115 | logger.info('SQLite database file does not exist!') |
||
| 116 | return False |
||
| 117 | |||
| 118 | # Try to connect the database to start the process: |
||
| 119 | try: |
||
| 120 | # Create on Connecting to the database file |
||
| 121 | ConnectedDatabase = sqlite3.connect(database_path) |
||
| 122 | c = ConnectedDatabase.cursor() |
||
| 123 | |||
| 124 | logger.info('Update records in SQLite database.') |
||
| 125 | |||
| 126 | # Update SQLite record row where key field values are found (inspired by: https://sebastianraschka.com/Articles/2014_sqlite_in_python_tutorial.html) |
||
| 127 | c.execute('UPDATE {tn} SET {f}="{fv}" WHERE {kf}="{kfv}"'.format(tn=table_name, f=field, fv=field_value, kf=key_field, kfv=key_field_value)) |
||
| 128 | |||
| 129 | except: |
||
| 130 | raise IOError() |
||
| 131 | |||
| 132 | # Closing the connection to the database file |
||
| 133 | ConnectedDatabase.commit() |
||
| 134 | ConnectedDatabase.close() |
||
| 135 | |||
| 136 | |||
| 137 | def check_header(database_path, table_name): |
||
| 138 | """ |
||
| 139 | Finds the table, connect to it, and then return the header. |
||
| 140 | :param database_path: |
||
| 141 | :param table_name: |
||
| 142 | :return: |
||
| 143 | """ |
||
| 144 | |||
| 145 | logger = logging.getLogger('SQLite check_header check') |
||
| 146 | |||
| 147 | table_header = None |
||
| 148 | |||
| 149 | try: |
||
| 150 | # Create on Connecting to the database file |
||
| 151 | ConnectedDatabase = sqlite3.connect(database_path) |
||
| 152 | |||
| 153 | c = ConnectedDatabase.cursor() |
||
| 154 | c.execute('PRAGMA TABLE_INFO({})'.format(table_name)) |
||
| 155 | |||
| 156 | table_header = c.fetchall() |
||
| 157 | |||
| 158 | ConnectedDatabase.commit() |
||
| 159 | ConnectedDatabase.close() |
||
| 160 | |||
| 161 | return table_header # zero indexed, LIST class of tuple of 5 elements. |
||
| 162 | |||
| 163 | except IOError: |
||
| 164 | return None |
||
| 165 | |||
| 166 | |||
| 167 | def check_header_index(database_path, table_name, field): |
||
| 168 | """ |
||
| 169 | Parse the list of the header and then check it against the field provided to return the index. |
||
| 170 | :param database_path: |
||
| 171 | :param table_name: |
||
| 172 | :param field: |
||
| 173 | :return: |
||
| 174 | """ |
||
| 175 | |||
| 176 | try: |
||
| 177 | header_list = check_header(database_path, table_name) # header list is a list of 5 elements tuples. |
||
| 178 | |||
| 179 | if header_list is not None: |
||
| 180 | global header_index |
||
| 181 | for table_column in header_list: |
||
| 182 | if table_column[1] == field: # 1 is field name. |
||
| 183 | global header_index |
||
| 184 | header_index = table_column[0] # 0 is the index |
||
| 185 | break |
||
| 186 | return header_index |
||
| 187 | except IOError: |
||
| 188 | return None |
||
| 189 | |||
| 190 | def validateLocalTableAndSchema(database_path, table_name, field): |
||
| 191 | """ |
||
| 192 | Does a comprehensitve check to ensure the field, 1) exist in the schema, 2) exist in the local table and then are the SAME! |
||
| 193 | :param database_path: |
||
| 194 | :param table_name: |
||
| 195 | :param field: the string of the field name that is to be searched. |
||
| 196 | :return: |
||
| 197 | """ |
||
| 198 | logger = logging.getLogger('LORISQuery_validateLocalTableAndSchema') |
||
| 199 | field_table_index = -1 |
||
| 200 | field_schema_index = -2 |
||
| 201 | |||
| 202 | |||
| 203 | # Schema check: note that schema contains keyfield |
||
| 204 | if field not in CNBP_schema: |
||
|
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Loading history...
|
|||
| 205 | return False, "Current planned schema does not contain " + field |
||
| 206 | else: |
||
| 207 | field_schema_index = CNBP_schema.index(field) |
||
| 208 | |||
| 209 | # Table header check: table also must contain keyfield |
||
| 210 | try: |
||
| 211 | field_table_index = check_header_index(database_path, table_name, field) |
||
| 212 | if field_table_index is None: |
||
| 213 | return False, "SQLite table HEADER does not contain " + field |
||
| 214 | else: |
||
| 215 | logger.info("SQLite table HEADER for the field " + field + " is " + str(field_table_index)) |
||
| 216 | except IOError: |
||
| 217 | return False, "Database not reachable" |
||
| 218 | |||
| 219 | if field_table_index < 0 or field_schema_index < 0: |
||
| 220 | return False, "Check program for bugs. Default values not modified" |
||
| 221 | |||
| 222 | # This ensure we checking the right column for field information by validating against both schema and table. |
||
| 223 | if field_table_index != field_schema_index: |
||
| 224 | return False, "Schema & Table definition not matching" |
||
| 225 | |||
| 226 | return True, "Database and Schema congruently support this field and its position" |
||
| 227 | |||
| 228 | |||
| 229 | #if __name__ == '__main__': |
||
| 230 | |||
| 231 |