-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsql_access.py
More file actions
executable file
·446 lines (359 loc) · 19.5 KB
/
Copy pathsql_access.py
File metadata and controls
executable file
·446 lines (359 loc) · 19.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
##############################################################################################
# Project : sqlite3 wrapper
# File : sql_access.py
# Author : Remi Malaquin
# Date : 01/17/2018
# Description : Routines to access and modify SQL database and table.
##############################################################################################
import sqlite3
from sql_utils import *
class SQLDatabase:
"""
Manage Database
"""
def __init__(self, databaseName="/default/directory/DBName"):
"""
Connect to the database & create cursor.
:param databaseName: DB file to create/read
"""
self.base = sqlite3.connect(databaseName)
self.base.create_function("noaccent", 1, translate_no_accent_nocase_sensitive)
self.cursor = self.base.cursor()
self.SQLdblog = Logger(name='SQLDatabase', severity=logging.INFO)
def commit(self):
"""
Commit changes into the database
:return: None
"""
self.SQLdblog.debug(functionName="commit",
message="Commit into Database")
self.base.commit()
def close(self):
"""
Close Database
:return: None
"""
self.SQLdblog.debug(functionName="close",
message="Close Database")
self.base.close()
def list_table(self):
"""
List all the table inside the database
:return: list of table name
"""
self.cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
return [idx[0] for idx in self.cursor.fetchall()]
def drop(self, table: str):
try:
if table not in self.list_table():
raise SqlTableUnknown("Table does not exist in the database")
self.cursor.execute("DROP TABLE " + table)
self.SQLdblog.debug(functionName="drop",
message="Table '" + table + "' has been removed from database.")
except SqlTableUnknown:
self.SQLdblog.error(functionName="drop",
message="Table '" + table + "' doesn't exist in the database...")
raise
except sqlite3.OperationalError:
self.SQLdblog.error(functionName="drop",
message="Can't remove Table '" + table + "'")
class SQLTable(object):
"""
Manage Table in a database.
- Create Table
- Insert/modify/delete data in table
"""
def __init__(self, SQLdbObj, tableName, **kwargs):
"""
Define table related to a database, with its name and its parameters
- Create new table if does not exists
OR
- extract info from existing table (param_name & param_type)
:param SQLdbObj: database name object
:param tableName: table name
:param kwargs: <param_name1>='<param_type1>', <param_name2>='<param_type2>', ...
"""
self.db = SQLdbObj # related database object
self.tableName = tableName # Name of the table
self.dataType = [] #
self.filterKey = []
self.SQLtablelog = Logger(name='SQLTable', severity=logging.INFO)
# Dynamically create the defined table if not ALREADY defined.
try:
if self.tableName == '':
raise SqlTableNameEmptyError("Table cannot be created because the name is empty")
self.__create_table(kwargs) # Use Implicit function to create table
self.tableVar = kwargs # Extract parameters of the table
self.tablePrimVar = {k: v for k, v in kwargs.items()
if 'primary key'.lower() in v.lower()} # Extract Primary parameter of the table
self.tableLen = len(self.tableVar) # Number of parameter in the table
self.SQLtablelog.info(functionName="__init__", # str(inspect.stack()[-5][3]),
message="Table '" + self.tableName + "' created")
# Exception if name is empty !!!
except SqlTableNameEmptyError as e:
self.SQLtablelog.error(functionName="__init__", message=e.args[0])
raise
# Extract info from existing table in the database (parameters, Primary parameter, length, ...)
except sqlite3.OperationalError:
query_result = self.query_info() # Query the database about the table
self.tableVar = {idx[1]: idx[2] for idx in query_result} # Update parameters of the table
self.tablePrimVar = {idx[1]: idx[2] for idx in query_result
if idx[-1] == 1} # Update Primary parameter of the table
self.tableLen = len(self.tableVar) # Update Number of parameter in the table
self.SQLtablelog.debug(functionName="__init__",
message="Table '" + self.tableName + "' already exists : Info extracted.")
def __create_table(self, tableVar):
"""
Implicit function to create table => called in the __init__ function
:param tableVar: dictionary of {param_name : param_type}
:return: None
"""
# Merge every parameters of the Table in a list: ["name1 type1", "name2 type2", ...]
for key, val in tableVar.items():
self.dataType.append(key + " " + val)
# Create Table
self.db.cursor.execute("CREATE TABLE " + self.tableName + '(' + ", ".join(self.dataType) + ")")
def query_info(self):
"""
Query the database to analyse if table already exists.
:return: Table info (parameters, ...)
"""
self.db.cursor.execute("PRAGMA table_info(" + self.tableName + ")")
return self.db.cursor.fetchall()
def define_filter_for_insertion(self, filterKeys: list):
"""
Define a list of parameter that will be used during insertion to check if data already exists in table
!!!! Must be called before insert, otherwise, no filter will be applied !!!!
:param filterKeys: List of parameter to filter
:return: None
"""
# check if all filter are in the parameter of the table
try:
for key in filterKeys:
if key not in self.tableVar.keys():
raise InsertionKeyNotFoundError(str(filterKeys) + "' parameters not found in " + str(list(self.tableVar.keys())))
self.SQLtablelog.debug(functionName="define_filter_for_insertion",
message="'" + str(filterKeys) + "' parameters will be used for filtering")
except InsertionKeyNotFoundError:
self.SQLtablelog.error(functionName="define_filter_for_insertion",
message="'" + str(filterKeys) + "' parameters not found in "
+ str(list(self.tableVar.keys())))
finally:
self.filterKey = filterKeys
def insert(self, auth=False, **kwargs):
"""
Insert element into the associated table
:param auth: hidden parameter :)
:param kwargs: Dictionary of parameter => defined by SQLTable object
:return: None
"""
try:
# Check & Compare the characteristics of the parameters
''' # -> Table parameters (number, type) have already been defined during table creation
# This function checks that SQLTable.insert 'kwargs param' are inline with the table param
# - ref_param = parameters @ creation of the table
# - test_param = parameters @ call of insert function
'''
check_param_char(ref_param=self.tableVar, test_param=kwargs)
# Select the filter parameters (thanks to SQLTable.define_filter_for_insertion function)
param = {k: v for k, v in kwargs.items() if k in self.filterKey}
# Filter key MUST NOT be empty for insertion ...
for k, v in param.items():
if v == '':
raise SqlFilterKeyEmptyError("Parameter '" + str(k) + "' is empty")
# Check filtered parameter does NOT exist in table
table = self.select_one(**param)
check_for_double_items(param=param, table=table, query_info=self.query_info(), auth=auth)
self.SQLtablelog.info(functionName="insert",
message="INSERT in " + str(self.tableName) + ": " +
" | ".join([str(k) + "=" + str(kwargs[k]) for k in list(kwargs.keys())]))
# 'id' parameter should be automatically incremented, So don't need to insert it if present
if 'id' in kwargs.keys():
kwargs.pop('id')
table_len_final = self.tableLen - 1
else:
table_len_final = self.tableLen
self.db.cursor.execute("INSERT INTO " + self.tableName + '(' + ",".join(kwargs.keys()) + ") " +
"VALUES(" + ','.join(["?"] * table_len_final) + ")", tuple(kwargs.values()))
except (SqlFilterKeyEmptyError, SqlLengthParameterError, SqlNameParameterError, SqlTypeParameterError, SqlDoubleItemsOccurs) as e:
self.SQLtablelog.error(functionName="insert", message=e.args[0])
raise
except sqlite3.OperationalError:
self.SQLtablelog.error(functionName="insert", message="Please, check the definition of the table you try to access, " +
"parameter definition does not reach expectations")
def modify(self, **kwargs):
"""
Parameter already exists in table but you want to modify it anyway
:param kwargs: Primary key is mandatory to modify the table
:return: None
"""
#####################################################################################################
# Check & Compare the characteristics of the parameters
''''# -> Table parameters (type, ...) have already been defined during table creation
# This function checks that SQLTable.insert 'kwargs param' are inline with the table param
# - ref_param = parameters @ creation of the table
# - test_param = parameters @ call of insert function
'''
try:
check_param_char(ref_param=self.tableVar, test_param=kwargs, test='011')
# Extract Primary Key from Table info
search_param = next(iter(self.tablePrimVar.keys())) # Primary parameter to search in the table
#####################################################################################################
# Check if Primary Parameter has been defined in entry.
if search_param in kwargs.keys():
search_var = {k: v for k, v in kwargs.items()
if search_param.lower() == k.lower()} # Variable to search for primary key
kwargs.__delitem__(search_param) # Delete primary key from parameters
list_other_param = [key + "=?" for key in kwargs]
list_search_param = search_param + "=?"
tuple_param = tuple([kwargs[key] for key in kwargs]) + tuple([search_var[search_param]])
#################################################################################################
# Check first if it already exist in the table and if there are no double.
presence = self.select_one(**search_var)
presence_name = ", ".join([presence[i][0] for i in range(len(presence))])
if (len(presence) == 1) or (search_var[search_param] in [presence[i][0] for i in range(len(presence))]):
self.db.cursor.execute("UPDATE " + str(self.tableName) +
" SET " + ','.join(list_other_param) +
" WHERE " + list_search_param, tuple_param)
self.SQLtablelog.info(functionName="modify",
message="Modify '" + str(search_var[search_param]) +
"' item from table '" + str(self.tableName) +
"' with " + str(kwargs))
#################################################################################################
# Double has been detected during modify process
elif len(presence) > 1:
raise SqlSeveralElementItemsSelected("Several elements contains the same name: [" +
str(presence_name) + "] Choose the right one")
#################################################################################################
# No element detected with this name => use insert function instead
else:
raise SqlNoElementFound("No element found in that table => Use SQLTable.insert function instead")
#####################################################################################################
# Primary key is missing @ call
else:
raise SqlMissingPrimaryKey("Missing primary key : " + str(search_param))
except (SqlLengthParameterError, SqlNameParameterError, SqlTypeParameterError, SqlDoubleItemsOccurs,
SqlMissingPrimaryKey, SqlNoElementFound, SqlSeveralElementItemsSelected) as e:
self.SQLtablelog.error(functionName="modify", message=e.args[0])
raise
def delete(self, inclusion=" AND ", **kwargs):
"""
Delete a data from table
:param inclusion: define combinational logic between filter
:param kwargs: pattern research
:return: None
"""
filterkey = []
filterval = []
for key, value in kwargs.items():
for i in str(value).split(' '):
filterkey.append("instr(noaccent(" + key + "), ?)>0")
filterval.append(translate_no_accent_nocase_sensitive(i))
filtertotal = inclusion.join(filterkey)
try:
self.db.cursor.execute("DELETE FROM " + str(self.tableName) +
" WHERE " + filtertotal,
tuple(filterval))
self.SQLtablelog.info(functionName="delete",
message="All data filtered with" + str(filterval) +
" have been deleted from table '" + str(self.tableName) + "'")
except sqlite3.OperationalError:
self.SQLtablelog.error(functionName="delete",
message="Are you sure your parameter are correct")
def select_all(self):
"""
Query the database to extract all information from the selected table
:return: Tuple of information
"""
self.db.cursor.execute("SELECT " + ", ".join(self.tableVar.keys()) +
" FROM " + str(self.tableName))
return self.db.cursor.fetchall()
def select_one(self, inclusion=" AND ", **kwargs):
"""
Query the database to extract the information of a predefined name of the Table
:param inclusion: Choose the logic for filtering between multiple parameters
:param kwargs: Parameter to look for
:return: result of research
"""
filterkey = []
filterval = []
for key, value in kwargs.items():
for i in str(value).split(' '):
filterkey.append("instr(noaccent(" + key + "), ?)>0")
filterval.append(translate_no_accent_nocase_sensitive(i))
filtertotal = inclusion.join(filterkey)
try:
self.db.cursor.execute("SELECT " + ", ".join(self.tableVar.keys()) +
" FROM " + str(self.tableName) +
" WHERE " + filtertotal,
tuple(filterval))
except sqlite3.OperationalError:
self.SQLtablelog.error(functionName="select_one",
message="Did you define a filter before insertion?")
return self.db.cursor.fetchall()
if __name__ == "__main__":
# define SQL Database
database = SQLDatabase(databaseName="./DB")
# Define Table "Cars"
param_default = {'name': 'TEXT PRIMARY KEY', 'brand': 'TEXT', 'color': 'TEXT', 'price': 'FLOAT', 'horsepower': 'INTEGER'}
table = SQLTable(SQLdbObj=database, tableName="Cars", **param_default)
# define filter before insertion: These paramters must be UNIQUE in the table...
table.define_filter_for_insertion(['name'])
# Insert into table
try:
table.insert(name='i8', brand='BMW', color='Blue', price=143400, horsepower=357)
table.insert(name='Model S', brand='Tesla', color='Blue', price=68000, horsepower=382)
except SqlDoubleItemsOccurs as e:
print(e.args[0])
# Query the info of the table...
print("-----------------------")
print("Query the table")
print(table.query_info())
# Now, Select all elements in the table.
print("-----------------------")
print("All elements in the DB")
print(table.select_all())
# Now, Select one element in the table.
print("-----------------------")
print("ONE element selected in the DB")
print(table.select_one(name='S'))
# Now, Let's modify the color of the Tesla Model S.
print("-----------------------")
print("ONE element selected in the DB")
print(table.modify(name='Model S', color="Red"))
# Don't forget to commit Change in the database !!!
database.commit()
# Close Database
database.close()
# Reopen Database
dbobj = SQLDatabase(databaseName="./DB")
# Define Table "Ingredient"
param_default = {'name': 'TEXT PRIMARY KEY', 'calory': 'INTEGER', 'portion': 'FLOAT', 'unit': 'TEXT'}
table_ingredient = SQLTable(SQLdbObj=dbobj, tableName="Ingredient", **param_default)
# define filter before insertion: These paramters must be UNIQUE in the table...
table_ingredient.define_filter_for_insertion(['name'])
# Insert into table Ingredient
try:
table_ingredient.insert(name='Tiramisu', calory=10000, portion=3.2, unit='kg') ## ¯\_(ツ)_/¯
table_ingredient.insert(**{'name':'Tomato', 'calory':10, 'portion':123, 'unit':'g'})
except SqlDoubleItemsOccurs as e:
print(e.args[0])
# Don't forget to commit Change in the database !!!
dbobj.commit()
# Define Table "Cars"
param_default = {'name': 'TEXT PRIMARY KEY', 'tata': 'FLOAT', 'titi': 'INTEGER', 'babar': 'TEXT'}
table_car = SQLTable(SQLdbObj=dbobj, tableName="Cars", **param_default)
# On query, we can see that table cars already exist because it retrieve information from pre-existing table instead of yours...
print("-----------------------")
print("Query the table")
print(table_car.query_info())
# Now, Select one element in the table.
print("-----------------------")
print("All elements in the DB")
print(table_car.select_all())
# Now, list all table in the database.
print("-----------------------")
print("All tables in the database")
print(dbobj.list_table())
# Close Database
dbobj.close()