Files
phython-mimic-smtp-clientse…/Client/ClientThread.py
2025-07-05 14:24:22 +01:00

547 lines
25 KiB
Python

import queue
import socket
import selectors
import time
from datetime import datetime
from threading import Thread
from time import sleep
import ClientTempMail
import Crypto
# Included the unix timestamp function from the DBConn file so we dont have to use the entire class just for this
def get_unix_timestamp(seconds_offset=0) -> int:
"""
Function to get the current unix timestamp to sim the NOW() sql method, seconds offset will pull a
past of future timestamp
"""
return int(time.time()) + seconds_offset
class ClientThread(Thread):
"""Class for dealing with the threaded actions of the client app"""
def __init__(self, current_socket: socket, address: tuple):
Thread.__init__(self)
self.__address = address
self.__socket: socket.socket = current_socket
self.__selector = selectors.DefaultSelector()
self.__buffer_in = queue.Queue()
self.__buffer_out = queue.Queue()
# flag for the current network status, -1 for awaiting request, 0 for awaiting response, 1 for ready
self.current_status = -1
self.state = "INIT"
self.__init_timeout = get_unix_timestamp()
self.last_response_content = []
# Login and User variables
self.__session_ref = None
self.login_username = None
self.__user_full_name = None
self.__user_email = None
self.__user_role = None
self.initialised = False
# Mail class to hold the mail data structure that mimics the server version
self.client_mail_temp: ClientTempMail.ClientTempMail or None = None
self.__queued_messages = []
# Handshake and Encryption Variables
self.__pending_handshake = False
self.__hand_shook = False
self.__crypto_manager_receive: Crypto.Receiver or None = None
self.__crypto_manager_transmit: Crypto.Transmitter or None = None
# Active and Heartbeat Vars
self.active = True
self.__heartbeat_timer = None
self.__heartbeat_tick = -120
self.__selector.register(self.__socket, selectors.EVENT_READ | selectors.EVENT_WRITE, data=None)
def is_connected(self) -> bool:
"""Method for checking if the connection is active, and sending the noop command for the heartbeat"""
if not self.__socket:
return False
# If the connection is initialised we can send commands
if self.initialised:
try:
# Send heartbeat NOOP command, this is a background command as to not ruin the flow of the program
# since actual SMTP servers work in a specific order
self.queue_command("NOOP")
except Exception:
return False
return True
def get_users_role(self) -> str:
"""Method for getting the current users server role (USER or ADMIN)"""
return self.__user_role
def run(self):
"""Main threads looped run method that listens and writes our network data"""
pending_handshake_counter = 0
try:
while self.active:
waiting_commands = self.__selector.select(timeout=1)
for key, mask in waiting_commands:
try:
if mask & selectors.EVENT_READ:
self.__read()
elif mask & selectors.EVENT_WRITE and not self.__buffer_out.empty():
self.__write()
# Process any queued messages when an actual message is not being processed
elif mask & selectors.EVENT_WRITE and len(self.__queued_messages) > 0:
self.__write_queue()
except Exception as ex:
print(f"Exception hit: {repr(ex)}")
self.__close()
# Init time out makes sure that a "valid" connection it makes will time out if the server
# does not respond the way we want it to
if self.state == "INIT":
if self.__init_timeout < get_unix_timestamp(-40):
print("No INIT processed, the connection is most likely at fault.")
self.active = False
# Pending handshake so we can send the AUTH response then set the handshook flag after it has been sent
if self.__pending_handshake:
if pending_handshake_counter == 1:
self.__pending_handshake = False
self.__hand_shook = True
pending_handshake_counter = 0
else:
pending_handshake_counter += 1
# If logged in we can do our Heartbeat NOOP request to faux keep our connection alive
if self.__session_ref:
if self.__heartbeat_timer < get_unix_timestamp(self.__heartbeat_tick):
if not self.is_connected():
print("Server did not respond")
self.__close()
if not self.__selector.get_map():
break
finally:
self.__selector.close()
def __read(self):
"""Private method for reading the network data coming into the client"""
try:
data_in = self.__socket.recv(1024)
except BlockingIOError:
pass
else:
if data_in:
if self.__hand_shook:
# We decode the bytes of the message we receive, this should be a hex string
data_in_decoded_string = data_in.decode()
# If needed we split the response into chunks so that we can decrypt the multiple blocks that
# were sent
chunk_size = self.__crypto_manager_receive.get_expected_block_size()
data_in_chunks = [self.__crypto_manager_receive.decrypt_string(
data_in_decoded_string[i:i + chunk_size]
) for i in range(0, len(data_in_decoded_string), chunk_size)]
# Join all of the decrypted chunks back together
full_data_in = "".join(data_in_chunks)
self.__buffer_in.put(full_data_in)
else:
# Data is treated normally when we have not shaken hands
self.__buffer_in.put(data_in.decode())
else:
raise RuntimeError("Connection Closed")
self.process_response()
def __write_queue(self):
"""Private method for writing a queue of data with a simulated 'latency' to stop the messages being joined"""
# Some SMTP response require the server to send multiple responses per request
while len(self.__queued_messages) > 0:
# Sleep to simulate some kind of latency to stop the queue to hopefully stop
# the response from being added to the previous.
sleep(1)
try:
self.process_silent_command(self.__queued_messages.pop(0))
response = self.__buffer_out.get_nowait()
except Exception:
response = None
if response:
# This would sometimes interrupt the users input because a queued write would happen during it
# print(f"Sending Queued response to Client: {self.__address}")
try:
self.__do_write(response)
except BlockingIOError as ex:
print(f"IO Error: {repr(ex)}")
pass
def __write(self):
"""Private method for doing a single write to the socket"""
try:
response = self.__buffer_out.get_nowait()
except Exception:
response = None
if response:
print(f"Sending data: {repr(response)} to {self.__address}")
try:
self.__do_write(response)
except BlockingIOError as ex:
print(f"Could not send data: {repr(ex)}")
pass
def __do_write(self, response: bytes) -> None:
"""Shared Private method to do the actual write to the socket"""
# Check if we have shook hands
if self.__hand_shook:
# Get the max chunk size for the key we generated in case we need to split the request down
chunk_size = self.__crypto_manager_transmit.get_usable_byte_length()
response_chunks = [self.__crypto_manager_transmit.encrypt_string(response[i:i + chunk_size])
for i in range(0, len(response), chunk_size)]
full_response = b"".join(response_chunks)
self.__socket.send(full_response)
else:
self.__socket.send(response)
# Set the heartbeat timer since we just sent a command and wont need to send a noop again
self.__heartbeat_timer = get_unix_timestamp()
def process_command(self, command: str) -> None:
"""Method to Process the command and add it into the buffer, we also set the client to read mode"""
self.current_status = 0
self.__buffer_out.put(command.encode())
def process_silent_command(self, command: str) -> None:
"""Method to silently add a command to the buffer, this does not cause the client to enter read mode"""
self.__buffer_out.put(command.encode())
def queue_command(self, command: str) -> None:
"""Method to add a command to the queued message array for queued writing"""
self.__queued_messages.append(command)
def process_response(self):
"""Method to process the response from the server and to set the client to the required state"""
current_data = self.__buffer_in.get()
# Split the response string by the first space, this should give us a response code and response data
response_parts = current_data.split(" ", 1)
if len(response_parts) >= 2:
response_code = int(response_parts[0])
# If the response is not from the heartbeat print it
if response_code != 295:
print(f"Response: {repr(current_data)}")
if len(response_parts) > 1:
response_data = response_parts[1]
else:
response_data = None
# Check the current state and respond accordingly
if self.state == "INIT":
if response_code == 220:
self.state = "HELO"
# Initialised, now send the HELO command
self.process_command("HELO " + self.__address[0])
self.current_status = 0
self.initialised = True
else:
print("No response from the server...")
self.__close()
elif self.state == "HELO":
if response_code == 250:
# HELO responded, send the AUTH command
self.state = "AUTH"
self.current_status = 0
elif self.state == "AUTH":
if not self.__hand_shook:
if response_code == 570:
# set up the RSA encryption variables for encrypting the connection
self.__crypto_manager_transmit = Crypto.Transmitter(response_data)
self.__crypto_manager_receive = Crypto.Receiver()
self.__crypto_manager_receive.generate_keys()
# Send our public key back to the server
self.process_command("AUTH " + self.__crypto_manager_receive.get_public_key_pair())
# Set the client to a pending handshake
self.__pending_handshake = True
else:
if response_code == 250:
# Auth key was sent successfully, start the confirmation handshake
self.process_command("HSHK " + response_data)
self.state = "HSHK"
elif self.state == "HSHK":
if response_code == 250:
# The validation handshake was current, start the login state and set the write
self.state = "LOGI"
self.current_status = -1
elif self.state == "LOGI":
if response_code == 250:
# Login was successful, wait for the secondary role response so we can tell
# if the user is and ADMIN or USER
self.current_status = 0
elif response_code == 265:
self.__session_ref = response_data
self.current_status = 0
# Send the VRFY command to the get logged in users name and email address
self.state = "VRFY"
self.process_command("VRFY " + self.login_username)
elif response_code == 421:
pass
else:
# Bad login, try again
self.last_response_content.clear()
self.last_response_content.append(response_data)
self.current_status = -1
elif self.state == "VRFY":
if response_code == 261:
self.__user_role = response_data
elif response_code == 250:
# Process the email address that gets sent back with the VRFY response
if '@' in response_data:
if '<' in response_data:
start_bracket_index = response_data.find('<')
end_bracket_index = response_data.rfind('>')
self.__user_full_name = response_data[0:start_bracket_index-1].strip()
self.__user_email = response_data[start_bracket_index+1:end_bracket_index].strip()
else:
#
self.__user_full_name = "N/A"
self.__user_email = response_data.strip()
# Send the user to the Home menu (HMEN) as they are not logged in
self.state = "HMEN"
self.current_status = -1
else:
# VRFY failed and we send the user back to the login
self.reset_session()
self.state = "LOGI"
self.current_status = -1
elif response_code == 500:
# VRFY failed and we send the user back to the login
self.reset_session()
self.state = "LOGI"
self.current_status = -1
elif self.state == "MAIL":
# Start an email input, we start by asking for the receipt addresses (RCPT)
if response_code == 250:
self.state = "RCPT"
self.current_status = -1
else:
self.state = "HMEN"
self.current_status = -1
elif self.state == "RCPT":
self.last_response_content.clear()
if response_code == 250:
# The address was local and added
self.last_response_content.append("Address Added.")
elif response_code == 251:
# The address was valid but not local and added
self.last_response_content.append("Address Added, but not local.")
elif response_code == 503:
# The server said we are out of sequence, reset the transaction and start over
self.last_response_content.append("There was an issue with the order of commands sent, RSET sent.")
self.state = "RSET"
self.process_command("RSET")
else:
# The address was rejected, we remove it from the clients end
last_address = self.client_mail_temp.to_address_list.pop(-1)
self.last_response_content.append(f"Address {last_address} not added for reason: " + response_data)
if self.state != "RSET":
self.state = "RCPT"
self.current_status = -1
elif self.state == "SUBJ":
# The receipts were fine and we move onto the subject, subject does not have an official "state"
# we process it fully from the client side and send it along with all of the starting DATA requests
if response_code == 354:
string_date = datetime.now()
date_formatted = string_date.strftime("%d %b %y %H:%M:%S")
# send the email headers that get used when "forwarding" to another server, for completeness
# DATA starts with 4 lines, Date, From, Subject and To as a comma seperated list
self.queue_command("Date: " + date_formatted)
self.queue_command("From: " + self.__user_full_name + " <" + self.__user_email + ">")
self.queue_command("Subject: " + self.client_mail_temp.subject)
self.queue_command("To: " + ", ".join(self.client_mail_temp.to_address_list))
# Set the state to DATA and start accepting lines from the client until the >SEND command is used
self.state = "DATA"
self.current_status = -1
else:
# The server said we are out of sequence, reset the transaction and start over
self.last_response_content.append("There was an issue with the order of commands sent, RSET sent.")
self.state = "RSET"
self.process_command("RSET")
elif self.state == "DATA":
if response_code == 250:
# The data was sent and the server responded with an OK response, return to the home menu
self.last_response_content.clear()
self.last_response_content.append("Email sent.")
self.client_mail_temp = None
self.state = "HMEN"
self.current_status = -1
elif self.state == "RSET":
if response_code == 250:
# The server responded OK to our request to reset the current transaction
self.client_mail_temp = None
self.state = "HMEN"
self.current_status = -1
elif self.state == "VIEW":
if response_code == 211:
# Server responded to the VIEW command
self.current_status = 0
self.last_response_content.clear()
elif response_code == 214:
# Process the VIEW response until we get the end of DATA string
if response_data == "\n.\n":
self.current_status = -1
else:
self.last_response_content.append(response_data)
elif response_code == 504:
# There was an error getting a mail, usually if they try to enter
# a number that doesnt belong to them
self.last_response_content.clear()
self.last_response_content.append("There was an issue loading this mail")
self.current_status = -1
else:
# There was an unspecified error so we send the user back to the home menu
self.last_response_content.clear()
self.state = "HMEN"
self.current_status = -1
elif self.state == "ADMN":
if response_code == 250:
# Server has left ADMIN mode
if response_data == "OK QUIT":
self.last_response_content.clear()
self.state = "HMEN"
self.current_status = -1
# Server has processed the command given
elif response_data == "OK DONE":
self.last_response_content.clear()
self.last_response_content.append("Action Completed.")
self.current_status = -1
else:
self.last_response_content.clear()
self.current_status = -1
# Server sent a multipart ADMIN response
elif response_code == 211:
self.current_status = 0
self.last_response_content.clear()
elif response_code == 214:
if response_data == "\n.\n":
self.current_status = -1
else:
self.last_response_content.append(response_data)
# Server sent an ADMIN error response
elif response_code == 500:
self.last_response_content.clear()
self.last_response_content.append("Action Failed.")
self.current_status = -1
# Permission was denied by the server
else:
self.last_response_content.clear()
self.last_response_content.append("Permission Denied.")
self.state = "HMEN"
self.current_status = -1
elif self.state == "LOUT":
# Server logged the client out
if response_code == 250:
# Logged out
self.reset_session()
self.state = "LOGI"
else:
# Couldn't log out?
self.state = "HMEN"
self.current_status = -1
elif self.state == "HELP":
# Server responded to the HELP command and is sending back the multipart response
if response_code == 211:
self.current_status = 0
self.last_response_content.clear()
elif response_code == 214:
if response_data == "\n.\n":
self.current_status = -1
else:
self.last_response_content.append(response_data)
# Server responded with a command not implemented error becuase the help section does not exist
elif response_code == 504:
self.last_response_content.clear()
self.last_response_content.append("There was no help on file for your request.")
self.current_status = -1
# Unspecified error from the server, just sending the user to the home menu
else:
self.last_response_content.clear()
self.state = "HMEN"
self.current_status = -1
# Status 421 and 221 is only used when the connection has been terminated and we just exit at that point
if response_code == 421:
print("The connection was closed by the server with the following message:")
for line in self.last_response_content:
print(line)
else:
print(f"Reason: {response_data}")
self.__close()
self.current_status = -2
if response_code == 221:
self.state = "QUIT"
self.__close()
self.current_status = -2
else:
# The response did not have a valid response code
print("Response too short.")
def __close(self):
"""Private method to close the connection to the server and end the thread"""
print("Attempting to end the Connection...")
try:
self.__selector.unregister(self.__socket)
self.__socket.close()
print(f"Connection ended with {repr(self.__address)}")
except OSError as ex:
print(f"Could not close the connection: {repr(ex)}")
pass
finally:
self.__socket = None
self.active = False
self.current_status = -1
def get_session_ref(self) -> str:
"""Method to return the current session reference from the server"""
return self.__session_ref
def get_user_name(self) -> str:
"""Method to return the current users full name"""
return self.__user_full_name
def get_user_email(self) -> str:
"""Method to return the current users email address"""
return self.__user_email
def reset_session(self) -> None:
"""Method to restart a clients session without closing the connection, used for the logout command"""
self.__session_ref = None
self.__user_email = None
self.__user_full_name = None
self.__user_role = None
self.login_username = None