Files
2025-07-05 14:24:22 +01:00

209 lines
11 KiB
Python

import random
import string
import time
import SHA256Custom
from DBConn import DBConn, get_unix_timestamp
class User:
"""Class for dealing with a User object from the database"""
def __init__(self, user_id):
# Check if the user id is being added as an int for the user_id or string for the username
if isinstance(user_id, int):
self.username = None
self.__user_id = user_id
elif isinstance(user_id, str):
self.username = user_id
self.__user_id = None
self.first_name = ""
self.last_name = ""
self.email_address = ""
self.login_ip = ""
self.role = ""
self.__is_valid = False
self.__is_logged_in = False
self.login_date = -1
self.__load_user()
def user_logged_in(self):
"""Method to check if the user is logged in"""
return self.__is_logged_in
def get_user_id(self):
"""Method to return the users id"""
return self.__user_id
def get_user_fullname(self):
"""Method to return the users full name"""
return self.first_name + " " + self.last_name
def valid_user(self) -> bool:
"""Method to check that the loaded user is valid"""
return self.__is_valid
def __load_user(self) -> None:
"""Private method to load the user data from the database"""
db_connection = DBConn()
# Use the username if the user_id was not provided
if not self.__user_id:
user_details = db_connection.do_select("users", "row", "*", "username = :username",
[
{
"field": "username",
"value": self.username
}
])
else:
user_details = db_connection.do_select("users", "row", "*", "user_id = :user_id",
[
{
"field": "user_id",
"value": self.__user_id
}
])
if user_details is not None and len(user_details) > 0:
self.__user_id = user_details['user_id']
self.username = user_details['username']
self.first_name = user_details['user_first_name']
self.last_name = user_details['user_last_name']
self.email_address = user_details['user_email']
self.login_ip = ""
self.role = user_details['user_role']
self.__is_valid = True
self.login_date = int(time.time())
def login(self, password, ip_address) -> any:
"""Method for attempting to log in the current user with the provided password"""
result = {"is_valid": False, "status_code": 0, "status_msg": ""}
# Check that the user isn't already logged in with this instance
if not self.__is_logged_in:
if len(password) > 0 and len(ip_address) > 0:
# Check that the user doesnt have 5 bad login attempts recently and is not locked out
if self.bad_login_attempt_count() < 5:
db_connection = DBConn()
user_details = db_connection.do_select("users", "row", "user_password, user_salt",
"username = :username",
[
{
"field": "username",
"value": self.username
}
])
if user_details is not None and len(user_details) > 0:
stored_password = user_details['user_password']
stored_salt = user_details['user_salt']
# Hash the input password with the users salt from the database
password_hash = SHA256Custom.SHA256Custom()
password_hash.update((password + stored_salt).encode())
login_password = password_hash.hexdigest()
# If the passwords match then login
if stored_password == login_password:
self.__is_logged_in = True
self.login_date = int(time.time())
# Log the login attempt for the server records
self.log_login_attempt(1, ip_address)
result = {"is_valid": True, "status_code": 1, "status_msg": "User Logged in."}
else:
# Log the login attempt for the server records
self.log_login_attempt(-1, ip_address)
result['status_msg'] = "Incorrect Password."
else:
result['status_msg'] = "User not found."
else:
result['status_msg'] = "Too many tries, please wait."
result['status_code'] = -1
else:
result['status_msg'] = "No Password and/or IP address provided."
else:
result['status_msg'] = "User already logged in."
return result
def save_current_changes(self) -> bool:
"""Method to save any changes made to the user to the database"""
result = False
if self.__is_valid:
db_connection = DBConn()
if db_connection.do_update("users",
[{"field": "username", "value": self.username},
{"field": "user_first_name", "value": self.first_name},
{"field": "user_last_name", "value": self.last_name},
{"field": "user_email", "value": self.email_address},
{"field": "user_role", "value": self.role}],
"user_id = :user_id",
[{"field": "user_id", "value": self.__user_id}]):
result = True
return result
def create_user(self, first_name: str, last_name: str, password: str, email_address: str, role="User") -> bool:
"""Method to create and save the user in the database"""
result = False
if not self.__is_valid:
if len(password) > 0 and len(email_address) and len(role) > 0:
user_salt = "".join(random.choices(string.ascii_uppercase + string.digits, k=10))
password_hash = SHA256Custom.SHA256Custom()
password_hash.update((password + user_salt).encode())
user_password = password_hash.hexdigest()
db_connection = DBConn()
if db_connection.do_insert("users",
[{"field": "username", "value": self.username},
{"field": "user_first_name", "value": first_name},
{"field": "user_last_name", "value": last_name},
{"field": "user_password", "value": user_password},
{"field": "user_salt", "value": user_salt},
{"field": "user_email", "value": email_address},
{"field": "user_role", "value": role},
{"field": "user_date_created", "value": get_unix_timestamp()}]):
self.__load_user()
result = True
return result
def unlock(self) -> None:
"""Method to unlock a user if they have been locked out by bad login attempts"""
if self.__is_valid and not self.__is_logged_in:
db_connection = DBConn()
# It just sets the status values to another failed integer so they no longer get counted
db_connection.do_update("login_attempts",
[{"field": "login_attempt_status", "value": -2}],
"user_id = :user_id AND login_attempt_status = :bad_status",
[{"field": "user_id", "value": self.__user_id},
{"field": "bad_status", "value": -1}])
def log_login_attempt(self, status, ip_address) -> None:
"""Method for getting the current login attempt count from the database"""
if self.__is_valid:
db_connection = DBConn()
db_connection.do_insert("login_attempts",
[{"field": "user_id", "value": self.__user_id},
{"field": "login_attempt_ip", "value": ip_address},
{"field": "login_attempt_status", "value": status},
{"field": "login_attempt_timestamp", "value": get_unix_timestamp()}])
def bad_login_attempt_count(self) -> int:
"""Method to count bad login attempts from the user within the set timeframe"""
result = 0
if self.__user_id:
db_connection = DBConn()
attempt_count = db_connection.do_select("login_attempts", "col", "COUNT(login_attempt_id)",
"user_id = :user_id AND "
"login_attempt_status = :bad_login_status AND "
"login_attempt_timestamp BETWEEN :start_time AND :end_time",
[{"field": "user_id", "value": self.__user_id},
{"field": "bad_login_status", "value": -1},
{"field": "start_time", "value": get_unix_timestamp(-300)},
{"field": "end_time", "value": get_unix_timestamp()}])
if attempt_count is not None:
result = attempt_count
return result