Files
2025-07-05 14:24:22 +01:00

244 lines
9.6 KiB
Python

import socket
import ClientTempMail
from ClientThread import ClientThread
class SMTPClient:
"""Class to hold the methods needed to process the client side of the script"""
def __init__(self, host: str, port: int):
self.__host = host
self.__port = port
self.__client_thread: ClientThread or None = None
def is_active(self) -> bool:
"""Method to check if the child thread is still active"""
return self.__client_thread.active
def is_connected(self) -> bool:
"""Method to check if the child thread is still connected"""
try:
return self.__client_thread.is_connected()
except Exception:
return False
def get_state(self) -> str:
"""Method to return the child threads state"""
return self.__client_thread.state
def get_current_response_content(self) -> any:
"""Method to return the client threads last response array"""
return self.__client_thread.last_response_content
def get_current_login_username(self) -> str:
"""Method to return the current logged in users username"""
return self.__client_thread.login_username
def get_current_users_name(self) -> str:
"""Method to return the current users full name"""
return self.__client_thread.get_user_name()
def get_current_users_email_address(self) -> str:
"""Method to return the current users email address"""
return self.__client_thread.get_user_email()
def get_current_temp_mail(self) -> ClientTempMail.ClientTempMail:
"""Method to return the current temp mail class object from the child thread"""
return self.__client_thread.client_mail_temp
def current_status(self) -> int:
"""Method to return the child threads current status"""
return self.__client_thread.current_status
def get_users_role(self) -> str:
"""Method to return the current users role"""
return self.__client_thread.get_users_role()
def start_connection(self) -> bool:
"""Method to start the clients connection to the server"""
server_address = (self.__host, self.__port)
current_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
current_socket.setblocking(False)
current_socket.connect_ex(server_address)
result = False
try:
# Create the child thread for the connection
self.__client_thread = ClientThread(current_socket, server_address)
self.__client_thread.start()
self.__client_thread.current_status = 0
result = True
except OSError as ex:
print(f"Error connecting to the server: {repr(ex)}")
return result
def close_connection(self) -> None:
"""Method to close the client connection"""
print("Attempting to end the Connection...")
try:
self.__client_thread.close_connection()
print(f"Connection ended with {self.__host}:{self.__port}")
except OSError as ex:
print(f"Could not close the connection: {repr(ex)}")
finally:
self.__client_thread = None
@staticmethod
def email_string_sanitize(input_str: str):
"""Static method to sanitize an email address"""
# Allowable Characters from !#$%&'*+-/=?^_`{|}~@.
special_chars = [33, 35, 36, 37, 38, 39, 42, 43, 45, 46, 47, 61, 63, 64, 94, 95, 96, 123, 124, 125, 126]
output_str = ""
for letter in input_str:
if (letter in special_chars) or (48 <= ord(letter) < 58) \
or (65 <= ord(letter) < 91) or (97 <= ord(letter) < 122):
output_str += letter
return output_str
def run_command(self, command: str, options=None) -> bool:
"""Method that processes the commands as they're sent"""
valid_command = False
if command is not None:
valid_command = True
full_command = None
# Login command processing
if command == "LOGI":
if options['username'] is not None:
# Clean the username to only accept email safe characters
cleaned_username = self.email_string_sanitize(options['username'])
else:
cleaned_username = ""
valid_command = False
password = options['password']
# Check if the username and password have been set correctly
if len(cleaned_username) > 0 and len(password) > 0:
self.__client_thread.login_username = cleaned_username
full_command = "LOGI " + cleaned_username + " " + password
else:
valid_command = False
# Start the mail command
elif command == "MAIL":
self.__client_thread.state = "MAIL"
# Create a mail object to hold the temp values we send to the server
self.__client_thread.client_mail_temp = ClientTempMail.ClientTempMail()
self.__client_thread.client_mail_temp.from_address = self.__client_thread.get_user_email
# Send the initial MAIL FROM command
full_command = "MAIL FROM <" + self.__client_thread.get_user_email() + ">"
elif command == "RCPT":
# Strip any whitespaces to stop it bypassing the len check
to_address = options['to_address'].strip()
if len(to_address) > 0:
# Check if the address is already in the list
if to_address not in self.__client_thread.client_mail_temp.to_address_list:
# Add the to address to the list and try to process it on the server
self.__client_thread.client_mail_temp.to_address_list.append(to_address)
full_command = "RCPT TO <" + to_address + ">"
else:
return False
else:
return False
elif command == "SUBJ":
# Check if the user has entered at least 1 address
if len(self.__client_thread.client_mail_temp.to_address_list) > 0:
self.__client_thread.state = "SUBJ"
return True
else:
return False
elif command == "DATA":
# Set the Subject locally because SUBJ is not an actual command in the mail sequence
self.__client_thread.client_mail_temp.subject = options['subject']
# Start the local data input
full_command = "DATA"
elif command == "LINE":
# Send the current data line to the server to append to the body
self.__client_thread.queue_command(options['data_line'])
return True
elif command == "SEND":
# Finish composing the email and send the end line
self.__client_thread.current_status = 0
self.__client_thread.process_command(options['data_line'])
return True
# Send the reset command
elif command == "RSET":
self.__client_thread.current_status = 0
self.__client_thread.state = "RSET"
full_command = "RSET"
# Send the view request
elif command == "VIEW":
if options and options['mail_id']:
# Client is trying to quit the view loop
if options['mail_id'].upper() == ">QUIT":
full_command = "VIEW QUIT"
# Client is trying to view the entire mailbox lis
elif options['mail_id'].upper() == ">LIST":
full_command = "VIEW LIST"
else:
# Client is trying to view a specific mail
full_command = "VIEW MAIL " + options['mail_id']
else:
# Do the original view list when the view option has been chosen
self.__client_thread.state = "VIEW"
full_command = "VIEW LIST"
# Send the logout command
elif command == "LOUT":
self.__client_thread.state = "LOUT"
full_command = "LOUT"
# send the help command
elif command == "HELP":
self.__client_thread.state = "HELP"
full_command = "HELP"
# Check if the user has entered a help section to search
if options and options['help_section']:
# The client wants to quit
if options['help_section'].upper() == ">QUIT":
full_command += " QUIT"
else:
full_command += " " + options['help_section'].upper()
# Send the ADMN request for admin users
elif command == "ADMN":
self.__client_thread.state = "ADMN"
if options and options['admin_command']:
# Do an unlock command to unlock a locked out user
if options['admin_command'].upper().startswith(">UNLK"):
unlock_command_parts = options['admin_command'].split(" ", 1)
full_command = "ADMN UNLK " + unlock_command_parts[1]
# Do a server log audit view
elif options['admin_command'].upper().startswith(">AUDT"):
full_command = "ADMN AUDT"
# Quit the admin command menu
elif options['admin_command'].upper().startswith(">QUIT"):
full_command = "ADMN QUIT"
else:
# Get the admin home menu
full_command = "ADMN HOME"
# Command does not need special processing
else:
full_command = command
if full_command and valid_command:
self.__client_thread.process_command(full_command)
return True
else:
return False