import sqlite3 import time import os # added pairing type, mostly to stop it complaining about line length, only works on 3.9 # ParamPair = list[dict[str, any]] def get_unix_timestamp(seconds_offset=0) -> int: """ Function to get the current unix timestamp to sim the NOW() sql method, seconds offset will pull a past of future timestamp """ return int(time.time()) + seconds_offset class DBConn: """Class to hold our methods for dealing with the SQLite3 Database""" # start the database, which we would usually have a config file with the host, port, username etc # but since sqlite just uses a file i've hard coded it def __init__(self): # Apparently outside of the pycharm env we need to tell it what DIR the DB is in, it should be in the server # folder path = os.path.dirname(os.path.abspath(__file__)) db = os.path.join(path, 'server.db') self.__db_conn = sqlite3.connect(db) # set the row factory, to simulate the fetch_assoc rows other DB's use self.__db_conn.row_factory = sqlite3.Row self.__db_cursor = self.__db_conn.cursor() def do_generic(self, query: str, return_values=False) -> any: """method for doing a generic SQL query, optionally return the result""" result = False if len(query) > 0: try: current_query = self.__db_cursor.execute(query) if return_values: result = current_query.fetchall() else: result = True except Exception as ex: print(f"Generic query Error: {repr(ex)}") return result def do_insert(self, table: str, insert_pairs, return_id=False): """Method for doing a generic insert query given the value pairs and table, we can return the insert id too""" result = False # Check if the table and insert values are not empty if len(table) > 0 and len(insert_pairs) > 0: fields = [] params = [] parsed_values = {} # Using prepared statements so we dont get SQL injected for pair in insert_pairs: fields.append(pair["field"]) params.append(':' + pair["field"]) parsed_values[pair["field"]] = pair["value"] # Construct the insert query with the input values query_fields = ','.join(fields) query_values = ','.join(params) insert_query = "INSERT INTO {} ({}) VALUES ({})".format(table, query_fields, query_values) try: if self.__db_cursor.execute(insert_query, parsed_values): self.__db_conn.commit() if return_id: # Return the last row id if requested, otherwise return True result = self.__db_cursor.lastrowid else: result = True except Exception as ex: print(f"Insert query Error: {repr(ex)}") return result def do_update(self, table: str, update_pairs, where: str, params) -> bool: """Method for doing a generic update query""" result = False if len(table) > 0 and len(update_pairs) > 0: field_params = [] parsed_values = {} # Setup the Update field/value pairs for pair in update_pairs: field_params.append("{} = {}".format(pair["field"], ':' + pair["field"])) parsed_values[pair["field"]] = pair["value"] # Add the values to the parameters for param in params: parsed_values[param["field"]] = param["value"] # Construct the update query query_fields = ','.join(field_params) insert_query = "UPDATE {} SET {} WHERE {}".format(table, query_fields, where) try: if self.__db_cursor.execute(insert_query, parsed_values): self.__db_conn.commit() result = True except Exception as ex: print(f"Update query Error: {repr(ex)}") return result def do_select(self, table: str, fetch_type="all", fields="", search="", params=None, query_extra=""): """Method for doing a generic select query""" result = None if len(table) > 0: query_fields = "*" query_block = "" if len(fields) > 0: # Check if the fields variable is a string or a list array if isinstance(fields, str): query_fields = fields if isinstance(fields, list): query_fields = ','.join(fields) # Check if we have actual search parameters if len(search) > 0: query_block = " WHERE " + search select_query = "SELECT {} FROM {}{} {}".format(query_fields, table, query_block, query_extra) parsed_params = {} for param in params: parsed_params[param["field"]] = param["value"] try: db_query = self.__db_conn.execute(select_query, parsed_params) # Return All, the Row or Column depending on the fetchtype passed in if fetch_type == "all": result = db_query.fetchall() elif fetch_type == "row": result = db_query.fetchone() elif fetch_type == "col": result = db_query.fetchone()[0] except Exception as ex: print(f"Select query Error: {repr(ex)}") return result def get_system_setting(self, key: str): """Method for getting a global system setting""" setting_value = self.do_select("system_settings", "col", "setting_value", "setting_name = :setting_name", [ { "field": "setting_name", "value": key } ]) return setting_value def set_system_setting(self, key: str, value) -> bool: """Method for setting a system setting""" result = False # Check if the setting exists, i would normally just use a INSERT ON DUPLICATE KEY UPDATE query, but it doesnt # look like SQLite supports it value_exists = self.do_select("system_settings", "col", "COUNT(*)", "setting_name = :setting_name", [ { "field": "setting_name", "value": key } ]) # If the key exists then update other wise insert it if value_exists and value_exists > 0: if self.do_update("system_settings", [{"field": "setting_value", "value": value}], "setting_name = :setting_name", [{"field": "setting_name", "value": key}]): result = True else: if self.do_insert("system_settings", [{"field": "setting_name", "value": key}, {"field": "setting_value", "value": value}]): result = True return result def __del__(self): """Close the connection and cursor when the object is removed""" self.__db_cursor.close() self.__db_conn.close()