import random import string import time import SHA256Custom from DBConn import DBConn, get_unix_timestamp class User: """Class for dealing with a User object from the database""" def __init__(self, user_id): # Check if the user id is being added as an int for the user_id or string for the username if isinstance(user_id, int): self.username = None self.__user_id = user_id elif isinstance(user_id, str): self.username = user_id self.__user_id = None self.first_name = "" self.last_name = "" self.email_address = "" self.login_ip = "" self.role = "" self.__is_valid = False self.__is_logged_in = False self.login_date = -1 self.__load_user() def user_logged_in(self): """Method to check if the user is logged in""" return self.__is_logged_in def get_user_id(self): """Method to return the users id""" return self.__user_id def get_user_fullname(self): """Method to return the users full name""" return self.first_name + " " + self.last_name def valid_user(self) -> bool: """Method to check that the loaded user is valid""" return self.__is_valid def __load_user(self) -> None: """Private method to load the user data from the database""" db_connection = DBConn() # Use the username if the user_id was not provided if not self.__user_id: user_details = db_connection.do_select("users", "row", "*", "username = :username", [ { "field": "username", "value": self.username } ]) else: user_details = db_connection.do_select("users", "row", "*", "user_id = :user_id", [ { "field": "user_id", "value": self.__user_id } ]) if user_details is not None and len(user_details) > 0: self.__user_id = user_details['user_id'] self.username = user_details['username'] self.first_name = user_details['user_first_name'] self.last_name = user_details['user_last_name'] self.email_address = user_details['user_email'] self.login_ip = "" self.role = user_details['user_role'] self.__is_valid = True self.login_date = int(time.time()) def login(self, password, ip_address) -> any: """Method for attempting to log in the current user with the provided password""" result = {"is_valid": False, "status_code": 0, "status_msg": ""} # Check that the user isn't already logged in with this instance if not self.__is_logged_in: if len(password) > 0 and len(ip_address) > 0: # Check that the user doesnt have 5 bad login attempts recently and is not locked out if self.bad_login_attempt_count() < 5: db_connection = DBConn() user_details = db_connection.do_select("users", "row", "user_password, user_salt", "username = :username", [ { "field": "username", "value": self.username } ]) if user_details is not None and len(user_details) > 0: stored_password = user_details['user_password'] stored_salt = user_details['user_salt'] # Hash the input password with the users salt from the database password_hash = SHA256Custom.SHA256Custom() password_hash.update((password + stored_salt).encode()) login_password = password_hash.hexdigest() # If the passwords match then login if stored_password == login_password: self.__is_logged_in = True self.login_date = int(time.time()) # Log the login attempt for the server records self.log_login_attempt(1, ip_address) result = {"is_valid": True, "status_code": 1, "status_msg": "User Logged in."} else: # Log the login attempt for the server records self.log_login_attempt(-1, ip_address) result['status_msg'] = "Incorrect Password." else: result['status_msg'] = "User not found." else: result['status_msg'] = "Too many tries, please wait." result['status_code'] = -1 else: result['status_msg'] = "No Password and/or IP address provided." else: result['status_msg'] = "User already logged in." return result def save_current_changes(self) -> bool: """Method to save any changes made to the user to the database""" result = False if self.__is_valid: db_connection = DBConn() if db_connection.do_update("users", [{"field": "username", "value": self.username}, {"field": "user_first_name", "value": self.first_name}, {"field": "user_last_name", "value": self.last_name}, {"field": "user_email", "value": self.email_address}, {"field": "user_role", "value": self.role}], "user_id = :user_id", [{"field": "user_id", "value": self.__user_id}]): result = True return result def create_user(self, first_name: str, last_name: str, password: str, email_address: str, role="User") -> bool: """Method to create and save the user in the database""" result = False if not self.__is_valid: if len(password) > 0 and len(email_address) and len(role) > 0: user_salt = "".join(random.choices(string.ascii_uppercase + string.digits, k=10)) password_hash = SHA256Custom.SHA256Custom() password_hash.update((password + user_salt).encode()) user_password = password_hash.hexdigest() db_connection = DBConn() if db_connection.do_insert("users", [{"field": "username", "value": self.username}, {"field": "user_first_name", "value": first_name}, {"field": "user_last_name", "value": last_name}, {"field": "user_password", "value": user_password}, {"field": "user_salt", "value": user_salt}, {"field": "user_email", "value": email_address}, {"field": "user_role", "value": role}, {"field": "user_date_created", "value": get_unix_timestamp()}]): self.__load_user() result = True return result def unlock(self) -> None: """Method to unlock a user if they have been locked out by bad login attempts""" if self.__is_valid and not self.__is_logged_in: db_connection = DBConn() # It just sets the status values to another failed integer so they no longer get counted db_connection.do_update("login_attempts", [{"field": "login_attempt_status", "value": -2}], "user_id = :user_id AND login_attempt_status = :bad_status", [{"field": "user_id", "value": self.__user_id}, {"field": "bad_status", "value": -1}]) def log_login_attempt(self, status, ip_address) -> None: """Method for getting the current login attempt count from the database""" if self.__is_valid: db_connection = DBConn() db_connection.do_insert("login_attempts", [{"field": "user_id", "value": self.__user_id}, {"field": "login_attempt_ip", "value": ip_address}, {"field": "login_attempt_status", "value": status}, {"field": "login_attempt_timestamp", "value": get_unix_timestamp()}]) def bad_login_attempt_count(self) -> int: """Method to count bad login attempts from the user within the set timeframe""" result = 0 if self.__user_id: db_connection = DBConn() attempt_count = db_connection.do_select("login_attempts", "col", "COUNT(login_attempt_id)", "user_id = :user_id AND " "login_attempt_status = :bad_login_status AND " "login_attempt_timestamp BETWEEN :start_time AND :end_time", [{"field": "user_id", "value": self.__user_id}, {"field": "bad_login_status", "value": -1}, {"field": "start_time", "value": get_unix_timestamp(-300)}, {"field": "end_time", "value": get_unix_timestamp()}]) if attempt_count is not None: result = attempt_count return result