209 lines
11 KiB
Python
209 lines
11 KiB
Python
import random
|
|
import string
|
|
import time
|
|
|
|
import SHA256Custom
|
|
from DBConn import DBConn, get_unix_timestamp
|
|
|
|
|
|
class User:
|
|
"""Class for dealing with a User object from the database"""
|
|
def __init__(self, user_id):
|
|
# Check if the user id is being added as an int for the user_id or string for the username
|
|
if isinstance(user_id, int):
|
|
self.username = None
|
|
self.__user_id = user_id
|
|
elif isinstance(user_id, str):
|
|
self.username = user_id
|
|
self.__user_id = None
|
|
self.first_name = ""
|
|
self.last_name = ""
|
|
self.email_address = ""
|
|
self.login_ip = ""
|
|
self.role = ""
|
|
self.__is_valid = False
|
|
self.__is_logged_in = False
|
|
self.login_date = -1
|
|
self.__load_user()
|
|
|
|
def user_logged_in(self):
|
|
"""Method to check if the user is logged in"""
|
|
return self.__is_logged_in
|
|
|
|
def get_user_id(self):
|
|
"""Method to return the users id"""
|
|
return self.__user_id
|
|
|
|
def get_user_fullname(self):
|
|
"""Method to return the users full name"""
|
|
return self.first_name + " " + self.last_name
|
|
|
|
def valid_user(self) -> bool:
|
|
"""Method to check that the loaded user is valid"""
|
|
return self.__is_valid
|
|
|
|
def __load_user(self) -> None:
|
|
"""Private method to load the user data from the database"""
|
|
db_connection = DBConn()
|
|
# Use the username if the user_id was not provided
|
|
if not self.__user_id:
|
|
user_details = db_connection.do_select("users", "row", "*", "username = :username",
|
|
[
|
|
{
|
|
"field": "username",
|
|
"value": self.username
|
|
}
|
|
])
|
|
else:
|
|
user_details = db_connection.do_select("users", "row", "*", "user_id = :user_id",
|
|
[
|
|
{
|
|
"field": "user_id",
|
|
"value": self.__user_id
|
|
}
|
|
])
|
|
if user_details is not None and len(user_details) > 0:
|
|
self.__user_id = user_details['user_id']
|
|
self.username = user_details['username']
|
|
self.first_name = user_details['user_first_name']
|
|
self.last_name = user_details['user_last_name']
|
|
self.email_address = user_details['user_email']
|
|
self.login_ip = ""
|
|
self.role = user_details['user_role']
|
|
self.__is_valid = True
|
|
self.login_date = int(time.time())
|
|
|
|
def login(self, password, ip_address) -> any:
|
|
"""Method for attempting to log in the current user with the provided password"""
|
|
result = {"is_valid": False, "status_code": 0, "status_msg": ""}
|
|
# Check that the user isn't already logged in with this instance
|
|
if not self.__is_logged_in:
|
|
if len(password) > 0 and len(ip_address) > 0:
|
|
# Check that the user doesnt have 5 bad login attempts recently and is not locked out
|
|
if self.bad_login_attempt_count() < 5:
|
|
db_connection = DBConn()
|
|
user_details = db_connection.do_select("users", "row", "user_password, user_salt",
|
|
"username = :username",
|
|
[
|
|
{
|
|
"field": "username",
|
|
"value": self.username
|
|
}
|
|
])
|
|
if user_details is not None and len(user_details) > 0:
|
|
stored_password = user_details['user_password']
|
|
stored_salt = user_details['user_salt']
|
|
|
|
# Hash the input password with the users salt from the database
|
|
password_hash = SHA256Custom.SHA256Custom()
|
|
password_hash.update((password + stored_salt).encode())
|
|
|
|
login_password = password_hash.hexdigest()
|
|
|
|
# If the passwords match then login
|
|
if stored_password == login_password:
|
|
self.__is_logged_in = True
|
|
self.login_date = int(time.time())
|
|
# Log the login attempt for the server records
|
|
self.log_login_attempt(1, ip_address)
|
|
result = {"is_valid": True, "status_code": 1, "status_msg": "User Logged in."}
|
|
else:
|
|
# Log the login attempt for the server records
|
|
self.log_login_attempt(-1, ip_address)
|
|
result['status_msg'] = "Incorrect Password."
|
|
else:
|
|
result['status_msg'] = "User not found."
|
|
else:
|
|
result['status_msg'] = "Too many tries, please wait."
|
|
result['status_code'] = -1
|
|
else:
|
|
result['status_msg'] = "No Password and/or IP address provided."
|
|
else:
|
|
result['status_msg'] = "User already logged in."
|
|
|
|
return result
|
|
|
|
def save_current_changes(self) -> bool:
|
|
"""Method to save any changes made to the user to the database"""
|
|
result = False
|
|
if self.__is_valid:
|
|
db_connection = DBConn()
|
|
if db_connection.do_update("users",
|
|
[{"field": "username", "value": self.username},
|
|
{"field": "user_first_name", "value": self.first_name},
|
|
{"field": "user_last_name", "value": self.last_name},
|
|
{"field": "user_email", "value": self.email_address},
|
|
{"field": "user_role", "value": self.role}],
|
|
"user_id = :user_id",
|
|
[{"field": "user_id", "value": self.__user_id}]):
|
|
result = True
|
|
|
|
return result
|
|
|
|
def create_user(self, first_name: str, last_name: str, password: str, email_address: str, role="User") -> bool:
|
|
"""Method to create and save the user in the database"""
|
|
result = False
|
|
if not self.__is_valid:
|
|
if len(password) > 0 and len(email_address) and len(role) > 0:
|
|
user_salt = "".join(random.choices(string.ascii_uppercase + string.digits, k=10))
|
|
|
|
password_hash = SHA256Custom.SHA256Custom()
|
|
password_hash.update((password + user_salt).encode())
|
|
|
|
user_password = password_hash.hexdigest()
|
|
|
|
db_connection = DBConn()
|
|
if db_connection.do_insert("users",
|
|
[{"field": "username", "value": self.username},
|
|
{"field": "user_first_name", "value": first_name},
|
|
{"field": "user_last_name", "value": last_name},
|
|
{"field": "user_password", "value": user_password},
|
|
{"field": "user_salt", "value": user_salt},
|
|
{"field": "user_email", "value": email_address},
|
|
{"field": "user_role", "value": role},
|
|
{"field": "user_date_created", "value": get_unix_timestamp()}]):
|
|
self.__load_user()
|
|
result = True
|
|
|
|
return result
|
|
|
|
def unlock(self) -> None:
|
|
"""Method to unlock a user if they have been locked out by bad login attempts"""
|
|
if self.__is_valid and not self.__is_logged_in:
|
|
db_connection = DBConn()
|
|
# It just sets the status values to another failed integer so they no longer get counted
|
|
db_connection.do_update("login_attempts",
|
|
[{"field": "login_attempt_status", "value": -2}],
|
|
"user_id = :user_id AND login_attempt_status = :bad_status",
|
|
[{"field": "user_id", "value": self.__user_id},
|
|
{"field": "bad_status", "value": -1}])
|
|
|
|
def log_login_attempt(self, status, ip_address) -> None:
|
|
"""Method for getting the current login attempt count from the database"""
|
|
if self.__is_valid:
|
|
db_connection = DBConn()
|
|
db_connection.do_insert("login_attempts",
|
|
[{"field": "user_id", "value": self.__user_id},
|
|
{"field": "login_attempt_ip", "value": ip_address},
|
|
{"field": "login_attempt_status", "value": status},
|
|
{"field": "login_attempt_timestamp", "value": get_unix_timestamp()}])
|
|
|
|
def bad_login_attempt_count(self) -> int:
|
|
"""Method to count bad login attempts from the user within the set timeframe"""
|
|
result = 0
|
|
if self.__user_id:
|
|
db_connection = DBConn()
|
|
attempt_count = db_connection.do_select("login_attempts", "col", "COUNT(login_attempt_id)",
|
|
"user_id = :user_id AND "
|
|
"login_attempt_status = :bad_login_status AND "
|
|
"login_attempt_timestamp BETWEEN :start_time AND :end_time",
|
|
[{"field": "user_id", "value": self.__user_id},
|
|
{"field": "bad_login_status", "value": -1},
|
|
{"field": "start_time", "value": get_unix_timestamp(-300)},
|
|
{"field": "end_time", "value": get_unix_timestamp()}])
|
|
|
|
if attempt_count is not None:
|
|
result = attempt_count
|
|
|
|
return result
|