189 lines
7.6 KiB
Python
189 lines
7.6 KiB
Python
import sqlite3
|
|
import time
|
|
import os
|
|
|
|
|
|
# added pairing type, mostly to stop it complaining about line length, only works on 3.9
|
|
# ParamPair = list[dict[str, any]]
|
|
|
|
|
|
def get_unix_timestamp(seconds_offset=0) -> int:
|
|
"""
|
|
Function to get the current unix timestamp to sim the NOW() sql method, seconds offset will pull a
|
|
past of future timestamp
|
|
"""
|
|
return int(time.time()) + seconds_offset
|
|
|
|
|
|
class DBConn:
|
|
"""Class to hold our methods for dealing with the SQLite3 Database"""
|
|
# start the database, which we would usually have a config file with the host, port, username etc
|
|
# but since sqlite just uses a file i've hard coded it
|
|
def __init__(self):
|
|
# Apparently outside of the pycharm env we need to tell it what DIR the DB is in, it should be in the server
|
|
# folder
|
|
path = os.path.dirname(os.path.abspath(__file__))
|
|
db = os.path.join(path, 'server.db')
|
|
|
|
self.__db_conn = sqlite3.connect(db)
|
|
# set the row factory, to simulate the fetch_assoc rows other DB's use
|
|
self.__db_conn.row_factory = sqlite3.Row
|
|
self.__db_cursor = self.__db_conn.cursor()
|
|
|
|
def do_generic(self, query: str, return_values=False) -> any:
|
|
"""method for doing a generic SQL query, optionally return the result"""
|
|
result = False
|
|
if len(query) > 0:
|
|
try:
|
|
current_query = self.__db_cursor.execute(query)
|
|
if return_values:
|
|
result = current_query.fetchall()
|
|
else:
|
|
result = True
|
|
except Exception as ex:
|
|
print(f"Generic query Error: {repr(ex)}")
|
|
|
|
return result
|
|
|
|
def do_insert(self, table: str, insert_pairs, return_id=False):
|
|
"""Method for doing a generic insert query given the value pairs and table, we can return the insert id too"""
|
|
result = False
|
|
# Check if the table and insert values are not empty
|
|
if len(table) > 0 and len(insert_pairs) > 0:
|
|
fields = []
|
|
params = []
|
|
parsed_values = {}
|
|
# Using prepared statements so we dont get SQL injected
|
|
for pair in insert_pairs:
|
|
fields.append(pair["field"])
|
|
params.append(':' + pair["field"])
|
|
parsed_values[pair["field"]] = pair["value"]
|
|
|
|
# Construct the insert query with the input values
|
|
query_fields = ','.join(fields)
|
|
query_values = ','.join(params)
|
|
insert_query = "INSERT INTO {} ({}) VALUES ({})".format(table, query_fields, query_values)
|
|
|
|
try:
|
|
if self.__db_cursor.execute(insert_query, parsed_values):
|
|
self.__db_conn.commit()
|
|
if return_id:
|
|
# Return the last row id if requested, otherwise return True
|
|
result = self.__db_cursor.lastrowid
|
|
else:
|
|
result = True
|
|
except Exception as ex:
|
|
print(f"Insert query Error: {repr(ex)}")
|
|
|
|
return result
|
|
|
|
def do_update(self, table: str, update_pairs, where: str, params) -> bool:
|
|
"""Method for doing a generic update query"""
|
|
result = False
|
|
if len(table) > 0 and len(update_pairs) > 0:
|
|
field_params = []
|
|
parsed_values = {}
|
|
# Setup the Update field/value pairs
|
|
for pair in update_pairs:
|
|
field_params.append("{} = {}".format(pair["field"], ':' + pair["field"]))
|
|
parsed_values[pair["field"]] = pair["value"]
|
|
|
|
# Add the values to the parameters
|
|
for param in params:
|
|
parsed_values[param["field"]] = param["value"]
|
|
|
|
# Construct the update query
|
|
query_fields = ','.join(field_params)
|
|
insert_query = "UPDATE {} SET {} WHERE {}".format(table, query_fields, where)
|
|
|
|
try:
|
|
if self.__db_cursor.execute(insert_query, parsed_values):
|
|
self.__db_conn.commit()
|
|
result = True
|
|
except Exception as ex:
|
|
print(f"Update query Error: {repr(ex)}")
|
|
|
|
return result
|
|
|
|
def do_select(self, table: str, fetch_type="all", fields="", search="", params=None, query_extra=""):
|
|
"""Method for doing a generic select query"""
|
|
result = None
|
|
if len(table) > 0:
|
|
query_fields = "*"
|
|
query_block = ""
|
|
if len(fields) > 0:
|
|
# Check if the fields variable is a string or a list array
|
|
if isinstance(fields, str):
|
|
query_fields = fields
|
|
if isinstance(fields, list):
|
|
query_fields = ','.join(fields)
|
|
|
|
# Check if we have actual search parameters
|
|
if len(search) > 0:
|
|
query_block = " WHERE " + search
|
|
|
|
select_query = "SELECT {} FROM {}{} {}".format(query_fields, table, query_block, query_extra)
|
|
|
|
parsed_params = {}
|
|
for param in params:
|
|
parsed_params[param["field"]] = param["value"]
|
|
|
|
try:
|
|
db_query = self.__db_conn.execute(select_query, parsed_params)
|
|
# Return All, the Row or Column depending on the fetchtype passed in
|
|
if fetch_type == "all":
|
|
result = db_query.fetchall()
|
|
elif fetch_type == "row":
|
|
result = db_query.fetchone()
|
|
elif fetch_type == "col":
|
|
result = db_query.fetchone()[0]
|
|
except Exception as ex:
|
|
print(f"Select query Error: {repr(ex)}")
|
|
|
|
return result
|
|
|
|
def get_system_setting(self, key: str):
|
|
"""Method for getting a global system setting"""
|
|
setting_value = self.do_select("system_settings", "col", "setting_value", "setting_name = :setting_name",
|
|
[
|
|
{
|
|
"field": "setting_name",
|
|
"value": key
|
|
}
|
|
])
|
|
|
|
return setting_value
|
|
|
|
def set_system_setting(self, key: str, value) -> bool:
|
|
"""Method for setting a system setting"""
|
|
result = False
|
|
# Check if the setting exists, i would normally just use a INSERT ON DUPLICATE KEY UPDATE query, but it doesnt
|
|
# look like SQLite supports it
|
|
value_exists = self.do_select("system_settings", "col", "COUNT(*)", "setting_name = :setting_name",
|
|
[
|
|
{
|
|
"field": "setting_name",
|
|
"value": key
|
|
}
|
|
])
|
|
|
|
# If the key exists then update other wise insert it
|
|
if value_exists and value_exists > 0:
|
|
if self.do_update("system_settings",
|
|
[{"field": "setting_value", "value": value}],
|
|
"setting_name = :setting_name",
|
|
[{"field": "setting_name", "value": key}]):
|
|
result = True
|
|
else:
|
|
if self.do_insert("system_settings",
|
|
[{"field": "setting_name", "value": key},
|
|
{"field": "setting_value", "value": value}]):
|
|
result = True
|
|
|
|
return result
|
|
|
|
def __del__(self):
|
|
"""Close the connection and cursor when the object is removed"""
|
|
self.__db_cursor.close()
|
|
self.__db_conn.close()
|