import socket import ClientTempMail from ClientThread import ClientThread class SMTPClient: """Class to hold the methods needed to process the client side of the script""" def __init__(self, host: str, port: int): self.__host = host self.__port = port self.__client_thread: ClientThread or None = None def is_active(self) -> bool: """Method to check if the child thread is still active""" return self.__client_thread.active def is_connected(self) -> bool: """Method to check if the child thread is still connected""" try: return self.__client_thread.is_connected() except Exception: return False def get_state(self) -> str: """Method to return the child threads state""" return self.__client_thread.state def get_current_response_content(self) -> any: """Method to return the client threads last response array""" return self.__client_thread.last_response_content def get_current_login_username(self) -> str: """Method to return the current logged in users username""" return self.__client_thread.login_username def get_current_users_name(self) -> str: """Method to return the current users full name""" return self.__client_thread.get_user_name() def get_current_users_email_address(self) -> str: """Method to return the current users email address""" return self.__client_thread.get_user_email() def get_current_temp_mail(self) -> ClientTempMail.ClientTempMail: """Method to return the current temp mail class object from the child thread""" return self.__client_thread.client_mail_temp def current_status(self) -> int: """Method to return the child threads current status""" return self.__client_thread.current_status def get_users_role(self) -> str: """Method to return the current users role""" return self.__client_thread.get_users_role() def start_connection(self) -> bool: """Method to start the clients connection to the server""" server_address = (self.__host, self.__port) current_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) current_socket.setblocking(False) current_socket.connect_ex(server_address) result = False try: # Create the child thread for the connection self.__client_thread = ClientThread(current_socket, server_address) self.__client_thread.start() self.__client_thread.current_status = 0 result = True except OSError as ex: print(f"Error connecting to the server: {repr(ex)}") return result def close_connection(self) -> None: """Method to close the client connection""" print("Attempting to end the Connection...") try: self.__client_thread.close_connection() print(f"Connection ended with {self.__host}:{self.__port}") except OSError as ex: print(f"Could not close the connection: {repr(ex)}") finally: self.__client_thread = None @staticmethod def email_string_sanitize(input_str: str): """Static method to sanitize an email address""" # Allowable Characters from !#$%&'*+-/=?^_`{|}~@. special_chars = [33, 35, 36, 37, 38, 39, 42, 43, 45, 46, 47, 61, 63, 64, 94, 95, 96, 123, 124, 125, 126] output_str = "" for letter in input_str: if (letter in special_chars) or (48 <= ord(letter) < 58) \ or (65 <= ord(letter) < 91) or (97 <= ord(letter) < 122): output_str += letter return output_str def run_command(self, command: str, options=None) -> bool: """Method that processes the commands as they're sent""" valid_command = False if command is not None: valid_command = True full_command = None # Login command processing if command == "LOGI": if options['username'] is not None: # Clean the username to only accept email safe characters cleaned_username = self.email_string_sanitize(options['username']) else: cleaned_username = "" valid_command = False password = options['password'] # Check if the username and password have been set correctly if len(cleaned_username) > 0 and len(password) > 0: self.__client_thread.login_username = cleaned_username full_command = "LOGI " + cleaned_username + " " + password else: valid_command = False # Start the mail command elif command == "MAIL": self.__client_thread.state = "MAIL" # Create a mail object to hold the temp values we send to the server self.__client_thread.client_mail_temp = ClientTempMail.ClientTempMail() self.__client_thread.client_mail_temp.from_address = self.__client_thread.get_user_email # Send the initial MAIL FROM command full_command = "MAIL FROM <" + self.__client_thread.get_user_email() + ">" elif command == "RCPT": # Strip any whitespaces to stop it bypassing the len check to_address = options['to_address'].strip() if len(to_address) > 0: # Check if the address is already in the list if to_address not in self.__client_thread.client_mail_temp.to_address_list: # Add the to address to the list and try to process it on the server self.__client_thread.client_mail_temp.to_address_list.append(to_address) full_command = "RCPT TO <" + to_address + ">" else: return False else: return False elif command == "SUBJ": # Check if the user has entered at least 1 address if len(self.__client_thread.client_mail_temp.to_address_list) > 0: self.__client_thread.state = "SUBJ" return True else: return False elif command == "DATA": # Set the Subject locally because SUBJ is not an actual command in the mail sequence self.__client_thread.client_mail_temp.subject = options['subject'] # Start the local data input full_command = "DATA" elif command == "LINE": # Send the current data line to the server to append to the body self.__client_thread.queue_command(options['data_line']) return True elif command == "SEND": # Finish composing the email and send the end line self.__client_thread.current_status = 0 self.__client_thread.process_command(options['data_line']) return True # Send the reset command elif command == "RSET": self.__client_thread.current_status = 0 self.__client_thread.state = "RSET" full_command = "RSET" # Send the view request elif command == "VIEW": if options and options['mail_id']: # Client is trying to quit the view loop if options['mail_id'].upper() == ">QUIT": full_command = "VIEW QUIT" # Client is trying to view the entire mailbox lis elif options['mail_id'].upper() == ">LIST": full_command = "VIEW LIST" else: # Client is trying to view a specific mail full_command = "VIEW MAIL " + options['mail_id'] else: # Do the original view list when the view option has been chosen self.__client_thread.state = "VIEW" full_command = "VIEW LIST" # Send the logout command elif command == "LOUT": self.__client_thread.state = "LOUT" full_command = "LOUT" # send the help command elif command == "HELP": self.__client_thread.state = "HELP" full_command = "HELP" # Check if the user has entered a help section to search if options and options['help_section']: # The client wants to quit if options['help_section'].upper() == ">QUIT": full_command += " QUIT" else: full_command += " " + options['help_section'].upper() # Send the ADMN request for admin users elif command == "ADMN": self.__client_thread.state = "ADMN" if options and options['admin_command']: # Do an unlock command to unlock a locked out user if options['admin_command'].upper().startswith(">UNLK"): unlock_command_parts = options['admin_command'].split(" ", 1) full_command = "ADMN UNLK " + unlock_command_parts[1] # Do a server log audit view elif options['admin_command'].upper().startswith(">AUDT"): full_command = "ADMN AUDT" # Quit the admin command menu elif options['admin_command'].upper().startswith(">QUIT"): full_command = "ADMN QUIT" else: # Get the admin home menu full_command = "ADMN HOME" # Command does not need special processing else: full_command = command if full_command and valid_command: self.__client_thread.process_command(full_command) return True else: return False