547 lines
25 KiB
Python
547 lines
25 KiB
Python
import queue
|
|
import socket
|
|
import selectors
|
|
import time
|
|
from datetime import datetime
|
|
from threading import Thread
|
|
from time import sleep
|
|
|
|
import ClientTempMail
|
|
import Crypto
|
|
|
|
|
|
# Included the unix timestamp function from the DBConn file so we dont have to use the entire class just for this
|
|
def get_unix_timestamp(seconds_offset=0) -> int:
|
|
"""
|
|
Function to get the current unix timestamp to sim the NOW() sql method, seconds offset will pull a
|
|
past of future timestamp
|
|
"""
|
|
return int(time.time()) + seconds_offset
|
|
|
|
|
|
class ClientThread(Thread):
|
|
"""Class for dealing with the threaded actions of the client app"""
|
|
def __init__(self, current_socket: socket, address: tuple):
|
|
Thread.__init__(self)
|
|
|
|
self.__address = address
|
|
self.__socket: socket.socket = current_socket
|
|
self.__selector = selectors.DefaultSelector()
|
|
|
|
self.__buffer_in = queue.Queue()
|
|
self.__buffer_out = queue.Queue()
|
|
|
|
# flag for the current network status, -1 for awaiting request, 0 for awaiting response, 1 for ready
|
|
self.current_status = -1
|
|
self.state = "INIT"
|
|
self.__init_timeout = get_unix_timestamp()
|
|
self.last_response_content = []
|
|
|
|
# Login and User variables
|
|
self.__session_ref = None
|
|
self.login_username = None
|
|
self.__user_full_name = None
|
|
self.__user_email = None
|
|
self.__user_role = None
|
|
self.initialised = False
|
|
|
|
# Mail class to hold the mail data structure that mimics the server version
|
|
self.client_mail_temp: ClientTempMail.ClientTempMail or None = None
|
|
|
|
self.__queued_messages = []
|
|
|
|
# Handshake and Encryption Variables
|
|
self.__pending_handshake = False
|
|
self.__hand_shook = False
|
|
self.__crypto_manager_receive: Crypto.Receiver or None = None
|
|
self.__crypto_manager_transmit: Crypto.Transmitter or None = None
|
|
|
|
# Active and Heartbeat Vars
|
|
self.active = True
|
|
self.__heartbeat_timer = None
|
|
self.__heartbeat_tick = -120
|
|
|
|
self.__selector.register(self.__socket, selectors.EVENT_READ | selectors.EVENT_WRITE, data=None)
|
|
|
|
def is_connected(self) -> bool:
|
|
"""Method for checking if the connection is active, and sending the noop command for the heartbeat"""
|
|
if not self.__socket:
|
|
return False
|
|
|
|
# If the connection is initialised we can send commands
|
|
if self.initialised:
|
|
try:
|
|
# Send heartbeat NOOP command, this is a background command as to not ruin the flow of the program
|
|
# since actual SMTP servers work in a specific order
|
|
self.queue_command("NOOP")
|
|
except Exception:
|
|
return False
|
|
|
|
return True
|
|
|
|
def get_users_role(self) -> str:
|
|
"""Method for getting the current users server role (USER or ADMIN)"""
|
|
return self.__user_role
|
|
|
|
def run(self):
|
|
"""Main threads looped run method that listens and writes our network data"""
|
|
pending_handshake_counter = 0
|
|
try:
|
|
while self.active:
|
|
waiting_commands = self.__selector.select(timeout=1)
|
|
for key, mask in waiting_commands:
|
|
try:
|
|
if mask & selectors.EVENT_READ:
|
|
self.__read()
|
|
elif mask & selectors.EVENT_WRITE and not self.__buffer_out.empty():
|
|
self.__write()
|
|
# Process any queued messages when an actual message is not being processed
|
|
elif mask & selectors.EVENT_WRITE and len(self.__queued_messages) > 0:
|
|
self.__write_queue()
|
|
except Exception as ex:
|
|
print(f"Exception hit: {repr(ex)}")
|
|
self.__close()
|
|
|
|
# Init time out makes sure that a "valid" connection it makes will time out if the server
|
|
# does not respond the way we want it to
|
|
if self.state == "INIT":
|
|
if self.__init_timeout < get_unix_timestamp(-40):
|
|
print("No INIT processed, the connection is most likely at fault.")
|
|
self.active = False
|
|
|
|
# Pending handshake so we can send the AUTH response then set the handshook flag after it has been sent
|
|
if self.__pending_handshake:
|
|
if pending_handshake_counter == 1:
|
|
self.__pending_handshake = False
|
|
self.__hand_shook = True
|
|
pending_handshake_counter = 0
|
|
else:
|
|
pending_handshake_counter += 1
|
|
|
|
# If logged in we can do our Heartbeat NOOP request to faux keep our connection alive
|
|
if self.__session_ref:
|
|
if self.__heartbeat_timer < get_unix_timestamp(self.__heartbeat_tick):
|
|
if not self.is_connected():
|
|
print("Server did not respond")
|
|
self.__close()
|
|
|
|
if not self.__selector.get_map():
|
|
break
|
|
finally:
|
|
self.__selector.close()
|
|
|
|
def __read(self):
|
|
"""Private method for reading the network data coming into the client"""
|
|
try:
|
|
data_in = self.__socket.recv(1024)
|
|
except BlockingIOError:
|
|
pass
|
|
else:
|
|
if data_in:
|
|
if self.__hand_shook:
|
|
# We decode the bytes of the message we receive, this should be a hex string
|
|
data_in_decoded_string = data_in.decode()
|
|
# If needed we split the response into chunks so that we can decrypt the multiple blocks that
|
|
# were sent
|
|
chunk_size = self.__crypto_manager_receive.get_expected_block_size()
|
|
data_in_chunks = [self.__crypto_manager_receive.decrypt_string(
|
|
data_in_decoded_string[i:i + chunk_size]
|
|
) for i in range(0, len(data_in_decoded_string), chunk_size)]
|
|
# Join all of the decrypted chunks back together
|
|
full_data_in = "".join(data_in_chunks)
|
|
self.__buffer_in.put(full_data_in)
|
|
else:
|
|
# Data is treated normally when we have not shaken hands
|
|
self.__buffer_in.put(data_in.decode())
|
|
else:
|
|
raise RuntimeError("Connection Closed")
|
|
|
|
self.process_response()
|
|
|
|
def __write_queue(self):
|
|
"""Private method for writing a queue of data with a simulated 'latency' to stop the messages being joined"""
|
|
# Some SMTP response require the server to send multiple responses per request
|
|
while len(self.__queued_messages) > 0:
|
|
# Sleep to simulate some kind of latency to stop the queue to hopefully stop
|
|
# the response from being added to the previous.
|
|
sleep(1)
|
|
try:
|
|
self.process_silent_command(self.__queued_messages.pop(0))
|
|
response = self.__buffer_out.get_nowait()
|
|
except Exception:
|
|
response = None
|
|
|
|
if response:
|
|
# This would sometimes interrupt the users input because a queued write would happen during it
|
|
# print(f"Sending Queued response to Client: {self.__address}")
|
|
try:
|
|
self.__do_write(response)
|
|
except BlockingIOError as ex:
|
|
print(f"IO Error: {repr(ex)}")
|
|
pass
|
|
|
|
def __write(self):
|
|
"""Private method for doing a single write to the socket"""
|
|
try:
|
|
response = self.__buffer_out.get_nowait()
|
|
except Exception:
|
|
response = None
|
|
|
|
if response:
|
|
print(f"Sending data: {repr(response)} to {self.__address}")
|
|
try:
|
|
self.__do_write(response)
|
|
except BlockingIOError as ex:
|
|
print(f"Could not send data: {repr(ex)}")
|
|
pass
|
|
|
|
def __do_write(self, response: bytes) -> None:
|
|
"""Shared Private method to do the actual write to the socket"""
|
|
# Check if we have shook hands
|
|
if self.__hand_shook:
|
|
# Get the max chunk size for the key we generated in case we need to split the request down
|
|
chunk_size = self.__crypto_manager_transmit.get_usable_byte_length()
|
|
response_chunks = [self.__crypto_manager_transmit.encrypt_string(response[i:i + chunk_size])
|
|
for i in range(0, len(response), chunk_size)]
|
|
full_response = b"".join(response_chunks)
|
|
self.__socket.send(full_response)
|
|
else:
|
|
self.__socket.send(response)
|
|
|
|
# Set the heartbeat timer since we just sent a command and wont need to send a noop again
|
|
self.__heartbeat_timer = get_unix_timestamp()
|
|
|
|
def process_command(self, command: str) -> None:
|
|
"""Method to Process the command and add it into the buffer, we also set the client to read mode"""
|
|
self.current_status = 0
|
|
self.__buffer_out.put(command.encode())
|
|
|
|
def process_silent_command(self, command: str) -> None:
|
|
"""Method to silently add a command to the buffer, this does not cause the client to enter read mode"""
|
|
self.__buffer_out.put(command.encode())
|
|
|
|
def queue_command(self, command: str) -> None:
|
|
"""Method to add a command to the queued message array for queued writing"""
|
|
self.__queued_messages.append(command)
|
|
|
|
def process_response(self):
|
|
"""Method to process the response from the server and to set the client to the required state"""
|
|
current_data = self.__buffer_in.get()
|
|
|
|
# Split the response string by the first space, this should give us a response code and response data
|
|
response_parts = current_data.split(" ", 1)
|
|
if len(response_parts) >= 2:
|
|
|
|
response_code = int(response_parts[0])
|
|
|
|
# If the response is not from the heartbeat print it
|
|
if response_code != 295:
|
|
print(f"Response: {repr(current_data)}")
|
|
|
|
if len(response_parts) > 1:
|
|
response_data = response_parts[1]
|
|
else:
|
|
response_data = None
|
|
|
|
# Check the current state and respond accordingly
|
|
if self.state == "INIT":
|
|
if response_code == 220:
|
|
self.state = "HELO"
|
|
# Initialised, now send the HELO command
|
|
self.process_command("HELO " + self.__address[0])
|
|
self.current_status = 0
|
|
self.initialised = True
|
|
else:
|
|
print("No response from the server...")
|
|
self.__close()
|
|
|
|
elif self.state == "HELO":
|
|
if response_code == 250:
|
|
# HELO responded, send the AUTH command
|
|
self.state = "AUTH"
|
|
self.current_status = 0
|
|
|
|
elif self.state == "AUTH":
|
|
if not self.__hand_shook:
|
|
if response_code == 570:
|
|
# set up the RSA encryption variables for encrypting the connection
|
|
self.__crypto_manager_transmit = Crypto.Transmitter(response_data)
|
|
self.__crypto_manager_receive = Crypto.Receiver()
|
|
self.__crypto_manager_receive.generate_keys()
|
|
# Send our public key back to the server
|
|
self.process_command("AUTH " + self.__crypto_manager_receive.get_public_key_pair())
|
|
# Set the client to a pending handshake
|
|
self.__pending_handshake = True
|
|
else:
|
|
if response_code == 250:
|
|
# Auth key was sent successfully, start the confirmation handshake
|
|
self.process_command("HSHK " + response_data)
|
|
self.state = "HSHK"
|
|
|
|
elif self.state == "HSHK":
|
|
if response_code == 250:
|
|
# The validation handshake was current, start the login state and set the write
|
|
self.state = "LOGI"
|
|
self.current_status = -1
|
|
|
|
elif self.state == "LOGI":
|
|
if response_code == 250:
|
|
# Login was successful, wait for the secondary role response so we can tell
|
|
# if the user is and ADMIN or USER
|
|
self.current_status = 0
|
|
elif response_code == 265:
|
|
self.__session_ref = response_data
|
|
self.current_status = 0
|
|
# Send the VRFY command to the get logged in users name and email address
|
|
self.state = "VRFY"
|
|
self.process_command("VRFY " + self.login_username)
|
|
elif response_code == 421:
|
|
pass
|
|
else:
|
|
# Bad login, try again
|
|
self.last_response_content.clear()
|
|
self.last_response_content.append(response_data)
|
|
self.current_status = -1
|
|
|
|
elif self.state == "VRFY":
|
|
if response_code == 261:
|
|
self.__user_role = response_data
|
|
elif response_code == 250:
|
|
# Process the email address that gets sent back with the VRFY response
|
|
if '@' in response_data:
|
|
if '<' in response_data:
|
|
start_bracket_index = response_data.find('<')
|
|
end_bracket_index = response_data.rfind('>')
|
|
self.__user_full_name = response_data[0:start_bracket_index-1].strip()
|
|
self.__user_email = response_data[start_bracket_index+1:end_bracket_index].strip()
|
|
else:
|
|
#
|
|
self.__user_full_name = "N/A"
|
|
self.__user_email = response_data.strip()
|
|
# Send the user to the Home menu (HMEN) as they are not logged in
|
|
self.state = "HMEN"
|
|
self.current_status = -1
|
|
else:
|
|
# VRFY failed and we send the user back to the login
|
|
self.reset_session()
|
|
self.state = "LOGI"
|
|
self.current_status = -1
|
|
elif response_code == 500:
|
|
# VRFY failed and we send the user back to the login
|
|
self.reset_session()
|
|
self.state = "LOGI"
|
|
self.current_status = -1
|
|
|
|
elif self.state == "MAIL":
|
|
# Start an email input, we start by asking for the receipt addresses (RCPT)
|
|
if response_code == 250:
|
|
self.state = "RCPT"
|
|
self.current_status = -1
|
|
else:
|
|
self.state = "HMEN"
|
|
self.current_status = -1
|
|
|
|
elif self.state == "RCPT":
|
|
self.last_response_content.clear()
|
|
if response_code == 250:
|
|
# The address was local and added
|
|
self.last_response_content.append("Address Added.")
|
|
elif response_code == 251:
|
|
# The address was valid but not local and added
|
|
self.last_response_content.append("Address Added, but not local.")
|
|
elif response_code == 503:
|
|
# The server said we are out of sequence, reset the transaction and start over
|
|
self.last_response_content.append("There was an issue with the order of commands sent, RSET sent.")
|
|
self.state = "RSET"
|
|
self.process_command("RSET")
|
|
else:
|
|
# The address was rejected, we remove it from the clients end
|
|
last_address = self.client_mail_temp.to_address_list.pop(-1)
|
|
self.last_response_content.append(f"Address {last_address} not added for reason: " + response_data)
|
|
|
|
if self.state != "RSET":
|
|
self.state = "RCPT"
|
|
self.current_status = -1
|
|
|
|
elif self.state == "SUBJ":
|
|
# The receipts were fine and we move onto the subject, subject does not have an official "state"
|
|
# we process it fully from the client side and send it along with all of the starting DATA requests
|
|
if response_code == 354:
|
|
string_date = datetime.now()
|
|
date_formatted = string_date.strftime("%d %b %y %H:%M:%S")
|
|
# send the email headers that get used when "forwarding" to another server, for completeness
|
|
# DATA starts with 4 lines, Date, From, Subject and To as a comma seperated list
|
|
self.queue_command("Date: " + date_formatted)
|
|
self.queue_command("From: " + self.__user_full_name + " <" + self.__user_email + ">")
|
|
self.queue_command("Subject: " + self.client_mail_temp.subject)
|
|
self.queue_command("To: " + ", ".join(self.client_mail_temp.to_address_list))
|
|
# Set the state to DATA and start accepting lines from the client until the >SEND command is used
|
|
self.state = "DATA"
|
|
self.current_status = -1
|
|
else:
|
|
# The server said we are out of sequence, reset the transaction and start over
|
|
self.last_response_content.append("There was an issue with the order of commands sent, RSET sent.")
|
|
self.state = "RSET"
|
|
self.process_command("RSET")
|
|
|
|
elif self.state == "DATA":
|
|
if response_code == 250:
|
|
# The data was sent and the server responded with an OK response, return to the home menu
|
|
self.last_response_content.clear()
|
|
self.last_response_content.append("Email sent.")
|
|
self.client_mail_temp = None
|
|
self.state = "HMEN"
|
|
self.current_status = -1
|
|
|
|
elif self.state == "RSET":
|
|
if response_code == 250:
|
|
# The server responded OK to our request to reset the current transaction
|
|
self.client_mail_temp = None
|
|
self.state = "HMEN"
|
|
self.current_status = -1
|
|
|
|
elif self.state == "VIEW":
|
|
if response_code == 211:
|
|
# Server responded to the VIEW command
|
|
self.current_status = 0
|
|
self.last_response_content.clear()
|
|
elif response_code == 214:
|
|
# Process the VIEW response until we get the end of DATA string
|
|
if response_data == "\n.\n":
|
|
self.current_status = -1
|
|
else:
|
|
self.last_response_content.append(response_data)
|
|
elif response_code == 504:
|
|
# There was an error getting a mail, usually if they try to enter
|
|
# a number that doesnt belong to them
|
|
self.last_response_content.clear()
|
|
self.last_response_content.append("There was an issue loading this mail")
|
|
self.current_status = -1
|
|
else:
|
|
# There was an unspecified error so we send the user back to the home menu
|
|
self.last_response_content.clear()
|
|
self.state = "HMEN"
|
|
self.current_status = -1
|
|
|
|
elif self.state == "ADMN":
|
|
if response_code == 250:
|
|
# Server has left ADMIN mode
|
|
if response_data == "OK QUIT":
|
|
self.last_response_content.clear()
|
|
self.state = "HMEN"
|
|
self.current_status = -1
|
|
# Server has processed the command given
|
|
elif response_data == "OK DONE":
|
|
self.last_response_content.clear()
|
|
self.last_response_content.append("Action Completed.")
|
|
self.current_status = -1
|
|
else:
|
|
self.last_response_content.clear()
|
|
self.current_status = -1
|
|
# Server sent a multipart ADMIN response
|
|
elif response_code == 211:
|
|
self.current_status = 0
|
|
self.last_response_content.clear()
|
|
elif response_code == 214:
|
|
if response_data == "\n.\n":
|
|
self.current_status = -1
|
|
else:
|
|
self.last_response_content.append(response_data)
|
|
# Server sent an ADMIN error response
|
|
elif response_code == 500:
|
|
self.last_response_content.clear()
|
|
self.last_response_content.append("Action Failed.")
|
|
self.current_status = -1
|
|
# Permission was denied by the server
|
|
else:
|
|
self.last_response_content.clear()
|
|
self.last_response_content.append("Permission Denied.")
|
|
self.state = "HMEN"
|
|
self.current_status = -1
|
|
|
|
elif self.state == "LOUT":
|
|
# Server logged the client out
|
|
if response_code == 250:
|
|
# Logged out
|
|
self.reset_session()
|
|
self.state = "LOGI"
|
|
else:
|
|
# Couldn't log out?
|
|
self.state = "HMEN"
|
|
|
|
self.current_status = -1
|
|
elif self.state == "HELP":
|
|
# Server responded to the HELP command and is sending back the multipart response
|
|
if response_code == 211:
|
|
self.current_status = 0
|
|
self.last_response_content.clear()
|
|
elif response_code == 214:
|
|
if response_data == "\n.\n":
|
|
self.current_status = -1
|
|
else:
|
|
self.last_response_content.append(response_data)
|
|
# Server responded with a command not implemented error becuase the help section does not exist
|
|
elif response_code == 504:
|
|
self.last_response_content.clear()
|
|
self.last_response_content.append("There was no help on file for your request.")
|
|
self.current_status = -1
|
|
# Unspecified error from the server, just sending the user to the home menu
|
|
else:
|
|
self.last_response_content.clear()
|
|
self.state = "HMEN"
|
|
self.current_status = -1
|
|
|
|
# Status 421 and 221 is only used when the connection has been terminated and we just exit at that point
|
|
if response_code == 421:
|
|
print("The connection was closed by the server with the following message:")
|
|
for line in self.last_response_content:
|
|
print(line)
|
|
else:
|
|
print(f"Reason: {response_data}")
|
|
self.__close()
|
|
self.current_status = -2
|
|
|
|
if response_code == 221:
|
|
self.state = "QUIT"
|
|
self.__close()
|
|
self.current_status = -2
|
|
|
|
else:
|
|
# The response did not have a valid response code
|
|
print("Response too short.")
|
|
|
|
def __close(self):
|
|
"""Private method to close the connection to the server and end the thread"""
|
|
print("Attempting to end the Connection...")
|
|
try:
|
|
self.__selector.unregister(self.__socket)
|
|
self.__socket.close()
|
|
print(f"Connection ended with {repr(self.__address)}")
|
|
except OSError as ex:
|
|
print(f"Could not close the connection: {repr(ex)}")
|
|
pass
|
|
finally:
|
|
self.__socket = None
|
|
self.active = False
|
|
self.current_status = -1
|
|
|
|
def get_session_ref(self) -> str:
|
|
"""Method to return the current session reference from the server"""
|
|
return self.__session_ref
|
|
|
|
def get_user_name(self) -> str:
|
|
"""Method to return the current users full name"""
|
|
return self.__user_full_name
|
|
|
|
def get_user_email(self) -> str:
|
|
"""Method to return the current users email address"""
|
|
return self.__user_email
|
|
|
|
def reset_session(self) -> None:
|
|
"""Method to restart a clients session without closing the connection, used for the logout command"""
|
|
self.__session_ref = None
|
|
self.__user_email = None
|
|
self.__user_full_name = None
|
|
self.__user_role = None
|
|
self.login_username = None
|