# System imports import random import string import socket import selectors import queue import uuid from threading import Thread from time import sleep # My class imports import Crypto import Mail import RawMail import ServerLog import Session import User from DBConn import DBConn, get_unix_timestamp class ServerThread(Thread): """Class for holding the server thread specific methods""" def __init__(self, current_socket: socket, address: tuple): Thread.__init__(self) self.__address = address self.__socket = current_socket self.__selector = selectors.DefaultSelector() self.__buffer_in = queue.Queue() self.__buffer_out = queue.Queue() # flag for the current network status, -1 for awaiting request, 0 for awaiting response, 1 for ready self.current_status = -1 self.active = True self.initialised = False self.__pending_close = False self.__main_state = "INIT" self.__sub_state = "" # Holding objects for the current session and raw mail self.__current_session: Session.Session or None = None self.__current_mail: RawMail.RawMail or None = None # Queue array for queued responses self.__queued_messages = [] # Variables needed for the custom RSA encryption self.__pending_handshake = False self.__hand_shook = False self.__crypt_test_phrase = None self.__crypto_manager_receive: Crypto.Receiver or None = None self.__crypto_manager_transmit: Crypto.Transmitter or None = None self.__selector.register(self.__socket, selectors.EVENT_READ | selectors.EVENT_WRITE, data=None) def run(self): """Method for doing the threads main loop""" pending_close_counter = 0 pending_handshake_counter = 0 timed_out = False try: while self.active: waiting_commands = self.__selector.select(timeout=None) for key, mask in waiting_commands: try: if mask & selectors.EVENT_READ: self.__read() elif mask & selectors.EVENT_WRITE and not self.__buffer_out.empty(): # Write/Send the initial message self.__write() elif mask & selectors.EVENT_WRITE and len(self.__queued_messages) > 0: # Write/Send any other queued messages for multipart responses self.__write_queue() except Exception as ex: print(ex) self.__close() # If the connection has not been initialised send the initial service ready response if not self.initialised: self.__process_smtp_response(220, self.__address[0] + " SMTP service ready") self.__main_state = "HELO" self.initialised = True # If the user is logged in but the connection has not received a heartbeat or request in the time then # close te connection and send the timeout response if self.__current_session and not timed_out: if self.__current_session.get_session_last_action_timestamp() < get_unix_timestamp(-300): self.__process_smtp_response(421, "Timeout, closing connection") timed_out = True # Check for a pending handshake, we dont want to flag it straight away as it will start encoding the # response before we are ready, this will allow the current request to be processed before flipping it if self.__pending_handshake: if pending_handshake_counter == 1: self.__pending_handshake = False self.__hand_shook = True pending_handshake_counter = 0 else: pending_handshake_counter += 1 # Check if we have a pending close request, this allows us to send the good bye response before rudely # ending the connection if self.__pending_close: if pending_close_counter == 1: self.active = False self.__close() else: pending_close_counter += 1 if not self.__selector.get_map(): break finally: self.__selector.close() def __read(self): """Private method for reading the response from the client""" try: received = self.__socket.recv(4096) except BlockingIOError as ex: print(f"IO Error: {repr(ex)}") else: if received: # If we have received a response, and the hand is shook and its not a plain emergency quit response we # decrypt the data here if self.__hand_shook and received.decode() not in ["QUIT"]: received_decoded_string = received.decode() # The request is split into the chunks needed and decrypted chunk_size = self.__crypto_manager_receive.get_expected_block_size() received_chunks = [self.__crypto_manager_receive.decrypt_string( received_decoded_string[i:i + chunk_size] ) for i in range(0, len(received_decoded_string), chunk_size)] # Once we have decrypted all of the request blocks we can join them back into 1 string full_received = "".join(received_chunks) print(f"Received request: {repr(full_received)} from Client: {self.__address}") self.__buffer_in.put(full_received) # We log the previous action in the session object and the time of the last action if self.__current_session: state_string = self.__main_state if self.__sub_state: state_string += "." + self.__sub_state self.__current_session.update_session(state_string, full_received) # and log the action in the server logs self.__write_log_entry("IN", full_received) else: # If we are not hand shook then we treat the request as normal and log the # action in the server logs self.__buffer_in.put(received.decode()) print(f"Received request: {repr(received)} from Client: {self.__address}") self.__write_log_entry("IN", received) else: raise RuntimeError("Connection Read Error, peer disconnected?") self.__process_smtp_request() def __write_queue(self): """Private method for processing the write queue""" # Some SMTP response require the server to send multiple responses per request while len(self.__queued_messages) > 0: # Sleep to simulate some kind of latency to stop the queue to hopefully stop # the response from being added to the previous. sleep(1) # Pop each item from the array until we have nothing item = self.__queued_messages.pop(0) self.__process_smtp_response(item['code'], item['message']) self.__write() self.__queued_messages.clear() def __write(self): """Private method for doing a normal socket write""" try: response = self.__buffer_out.get_nowait() except Exception: response = None if response: print(f"Sending Response: {response} to Client: {self.__address}") try: self.__do_write(response) except BlockingIOError as ex: print(f"IO Error: {repr(ex)}") pass def __do_write(self, response: bytes) -> None: """Private method shared between the write methods to do the actual socket write and encryption""" # If the hand is shook then do the encryption if self.__hand_shook: # Write the log item before encrypting so we know what it was self.__write_log_entry("OUT", response) # Split into the chunk sizes we need/if needed chunk_size = self.__crypto_manager_transmit.get_usable_byte_length() response_chunks = [self.__crypto_manager_transmit.encrypt_string(response[i:i + chunk_size]) for i in range(0, len(response), chunk_size)] # Join the request back together and send full_response = b"".join(response_chunks) self.__socket.send(full_response) else: # Otherwise just send as normal and log the server action self.__socket.send(response) self.__write_log_entry("OUT", response) def __write_log_entry(self, direction: str, action: str or bytes) -> None: """Private method to write a log to the server logs table""" # Check if the client is logged in and has an active session to log if self.__current_session: session_ref = self.__current_session.session_unique_ref else: session_ref = "" # Check if its the login request and remove the password since logging that would be pretty silly... try: if bytes(action).startswith(b"LOGI"): action_parts = action.split(" ") if len(action_parts) == 3: action = "LOGI " + action_parts[1] else: action = "LOGI Empty/Bad Request" # The auth requests are too long and storing the keys seems dumb so ignore those too elif bytes(action).startswith(b"AUTH"): action = "AUTH Auth sending" elif bytes(action).startswith(b"570"): action = "570 Auth sending" # Multipart requests also flood the table as well as self replicate when checking, ignore those as we log # the initial request anyway elif bytes(action).startswith(b"214"): return ServerLog.ServerLog().create_server_log(session_ref, direction, action, self.__address[0]) except Exception: pass def __queue_smtp_response(self, code: int, message: str): """Private method for queuing an SMTP response to the client""" self.__queued_messages.append({"code": code, "message": message}) def __process_smtp_response(self, code: int, message: str): """Private method for sending an SMTP response to a client """ data_out = str(code) if len(message) > 0: data_out += " " + message encoded_data_out = data_out.encode() self.__buffer_out.put(encoded_data_out) def __process_smtp_request(self): """Private method for processing a request from the client and sending responses back""" current_command = self.__buffer_in.get() # Split the request as it should be in a format like XXXX YYYYYYYYYYYY, where Y would be any optional data current_command_parts = current_command.split(" ", 1) # Create a connection to the DB db_connection = DBConn() # Check if we are in Data entry mode, otherwise process the command as normal if current_command_parts[0] not in ["RSET", "NOOP"] \ and (self.__main_state == "MAIL" and self.__sub_state == "DATA"): # Check if we receive the end data input line if current_command != "\n.\n": self.__current_mail.text_body += current_command + "\n" else: # If it has been we process the mail and "send" it to the users self.__current_mail.create_raw_mail(self.__current_session.get_user().get_user_id(), self.__current_mail.text_body) email_body = self.__current_mail.strip_headers_from_body() for recipient in self.__current_mail.to_address_list: # The local recipients are added as their user id when processing the RCPT requests if isinstance(recipient, int): temp_email = Mail.Mail() temp_email.create_mail(self.__current_mail.get_raw_id(), self.__current_mail.from_id, recipient, self.__current_mail.subject, email_body) else: # We dont really care about the non local recipients, # but they would be forwarded to their server pass # Mail has been saved, send the OK and set us back to the main menu state self.__process_smtp_response(250, "Ok") self.__main_state = "HMEN" self.__sub_state = None else: if len(current_command_parts) > 0: # Check if the user is logged in when required if current_command_parts[0] not in ["HELO", "NOOP", "AUTH", "HSHK", "LOGI"]: if self.__current_session is not None: # User is not valid, boot them from the server if self.__current_session.get_user() and not self.__current_session.get_user().user_logged_in(): self.__process_smtp_response(421, "Session invalid, Goodbye") return False else: # Session is invalid boot the client self.__process_smtp_response(421, "Session invalid, Goodbye") return False try: # Respond to the HELO command if current_command_parts[0] == "HELO": if self.__main_state == "HELO": self.__process_smtp_response(250, "Greetings from " + self.__address[0]) # Setup the custom RSA encryption variables and generate the keys to send self.__crypto_manager_receive = Crypto.Receiver() self.__crypto_manager_receive.generate_keys() # Queue the public key response to send after the previous response self.__queue_smtp_response(570, self.__crypto_manager_receive.get_public_key_pair()) self.__main_state = "AUTH" else: # We arnt expecting a HELO command, most likely because we didnt initialise self.__process_smtp_response(503, "Bad sequence of commands") # Response to the AUTH state requests elif current_command_parts[0] == "AUTH": if self.__main_state == "AUTH": if len(current_command_parts) == 2: # We should have received the public key from the client, we generate a random string # to send during the handshake to make sure both public keys are working correctly self.__crypto_manager_transmit = Crypto.Transmitter(current_command_parts[1]) self.__crypt_test_phrase = "".join( random.choices(string.ascii_uppercase + string.digits, k=10)) self.__queue_smtp_response(250, self.__crypt_test_phrase) self.__main_state = "HSHK" self.__hand_shook = True else: # If we didnt get a valid request the handshake most likely errored, we may as well # close the connection print(f"Bad handshake from {self.__address}") self.__process_smtp_response(421, "Bad handshake, bye") self.__pending_close = True elif current_command_parts[0] == "HSHK": if self.__main_state == "HSHK": if len(current_command_parts) == 2: # Check if the client returned the correct test phrase back, if so the encryption # is working both ways and we should be safe to proceed if current_command_parts[1] == self.__crypt_test_phrase: self.__queue_smtp_response(250, "OK") self.__main_state = "LOGI" else: # If the phrase is wrong there is obviously an issue with the encryption keys and # we just close the connection so the user can try again print(f"Bad handshake response from {self.__address}") self.__process_smtp_response(421, "Bad handshake, bye") self.__pending_close = True else: # If the response didnt have the right amount of elements then the handshake failed and we # close the connection so the user can try again print(f"Bad handshake response from {self.__address}") self.__process_smtp_response(421, "Bad handshake, bye") self.__pending_close = True # Process the NOOP Keep alive/Heartbeat response request elif current_command_parts[0] == "NOOP": self.__process_smtp_response(295, "OK") # Process the LOGI login request elif current_command_parts[0] == "LOGI": # If the client is already logged in or the server is not expecting the login command # send the bad sequence response if self.__main_state != "LOGI" \ or (self.__current_session and self.__current_session.session_is_valid()): # if the user is already logged in then they should not be sending the LOGI command self.__process_smtp_response(503, "Bad sequence of commands") elif len(current_command_parts) == 2 and len(current_command_parts[1]) > 0: # Login parts 0 should be the username and 1 the password login_parts = current_command_parts[1].split(" ", 1) # We should have 2 login parts if len(login_parts) == 2: user_login = User.User(login_parts[0]) current_login = user_login.login(login_parts[1], self.__address[0]) # The user has successfully logged in if current_login['status_code'] == 1: # Generate a UUID to use as the sessions unique ID session_uuid = str(uuid.uuid4()) self.__current_session = Session.Session(session_uuid, user_login) self.__current_session.create_session( user_login.get_user_id(), self.__address[0] ) self.__process_smtp_response(250, "OK, Welcome " + user_login.username) # Send the session reference back so the client can use this to verify itself self.__queue_smtp_response(265, self.__current_session.session_unique_ref) self.__main_state = "HMEN" elif current_login['status_code'] == -1: # Too many login attempts for this user so the connection is rejected self.__process_smtp_response(421, "Authentication failed, your connection has been " "terminated, please wait and try again later") self.__pending_close = True else: # 535 is the only error code that relates to authentication self.__process_smtp_response(535, "Authentication failed") else: self.__process_smtp_response(535, "Authentication failed") else: # Syntax error because they did not send the command correctly self.__process_smtp_response(501, "Syntax Error") # Respond to the VRFY request elif current_command_parts[0] == "VRFY": if len(current_command_parts) == 2: # Get the logged in users name and email address verify_user = User.User(current_command_parts[1]) if self.__current_session.get_user().get_user_id() == verify_user.get_user_id(): self.__queue_smtp_response(261, verify_user.role.upper()) self.__queue_smtp_response(250, verify_user.get_user_fullname() + " <" + verify_user.email_address + ">") else: self.__process_smtp_response(421, "Session invalid, Goodbye") else: self.__process_smtp_response(500, "Invalid command given") # Respond to the Log out Command elif current_command_parts[0] == "LOUT": logged_in_user = self.__current_session.get_user() if logged_in_user and logged_in_user.user_logged_in(): self.__current_session = None self.__process_smtp_response(250, "OK, Goodbye " + logged_in_user.username) self.__main_state = "LOGI" self.__sub_state = None else: # the user wasn't logged in so we shouldn't be getting this request self.__process_smtp_response(503, "Bad sequence of commands") # Process the MAIL command elif current_command_parts[0] == "MAIL": if self.__main_state == "HMEN": if len(current_command_parts) == 2: # To start the mail process the client should be sending the FROM address to us mail_command_parts = current_command_parts[1].split(" ", 1) if len(mail_command_parts) == 2 and mail_command_parts[0] == "FROM" \ and len(mail_command_parts[1].strip(" <>")) > 0: # Create a raw mail for us to add the temp fields in before creating the mail self.__current_mail = RawMail.RawMail() self.__current_mail.from_id = self.__current_session.get_user().get_user_id() self.__current_mail.from_address = mail_command_parts[1].strip(" <>") self.__main_state = "MAIL" self.__sub_state = "RCPT" self.__process_smtp_response(250, "OK") else: # Bad Email address sent self.__process_smtp_response(500, "Invalid command given") else: # Bad command sent self.__process_smtp_response(500, "Invalid command given") else: # Tried to mail from the wrong state self.__process_smtp_response(503, "Bad sequence of commands") # Process the RCPT request elif current_command_parts[0] == "RCPT": if self.__sub_state == "RCPT": if len(current_command_parts) == 2: # Split the command so that we can extract the email address rcpt_command_parts = current_command_parts[1].split(" ", 1) if len(rcpt_command_parts) == 2 and rcpt_command_parts[0] == "TO" \ and len(rcpt_command_parts[1].strip(" <>")) > 0: email_cleaned = rcpt_command_parts[1].strip(" <>") email_parts = email_cleaned.split("@", 1) if len(email_parts) == 2: # Check if the email address is from the local domain from the site settings if email_parts[1] == db_connection.get_system_setting("SERVER_DOMAIN"): local_user = db_connection.do_select("users", "col", "user_id", "user_email = :user_email", [ { "field": "user_email", "value": email_cleaned } ]) if local_user and int(local_user) > 0: # Address is a valid local address if local_user not in self.__current_mail.to_address_list: self.__current_mail.to_address_list.append(local_user) self.__process_smtp_response(250, "Ok") else: # Address is already in the list... self.__process_smtp_response(500, "Invalid command given") else: # Address does not exist locally self.__process_smtp_response(550, "Mailbox unavailable") else: # The email uses a non local domain, so we pretend we can forward it if email_cleaned not in self.__current_mail.to_address_list: self.__current_mail.to_address_list.append(email_cleaned) self.__process_smtp_response(251, "User not local, will attempt to forward") else: # Address is already in the list... self.__process_smtp_response(500, "Invalid command given") else: self.__process_smtp_response(500, "Not a valid email address") else: self.__process_smtp_response(500, "Invalid command given") else: self.__process_smtp_response(503, "Bad sequence of commands") # Start the DATA request response elif current_command_parts[0] == "DATA": if self.__main_state == "MAIL" and self.__sub_state == "RCPT": if len(self.__current_mail.to_address_list) > 0: # Set the server into data entry mode, this will cause it to loop at the start # of this method instead of match commands until the end characters are sent self.__process_smtp_response(354, "Start mail input; end with .") self.__sub_state = "DATA" self.__current_mail.text_body = "" else: self.__process_smtp_response(500, "Invalid command given") else: self.__process_smtp_response(503, "Bad sequence of commands") # Process the VIEW request elif current_command_parts[0] == "VIEW": if len(current_command_parts) == 2: view_command_parts = current_command_parts[1].split(" ", 1) # The client wants to quit the VIEW state if view_command_parts[0] == "QUIT": self.__process_smtp_response(250, "OK") self.__main_state = "HMEN" self.__sub_state = None # The client wants to view a mail elif view_command_parts[0] == "MAIL": self.__main_state = "VIEW" self.__sub_state = "MAIL" user_id = self.__current_session.get_user().get_user_id() mail_id = view_command_parts[1] mail_item = db_connection.do_select("mail_store", "row", "mail_id", "mail_id = :mail_id AND to_id = :to_id", [ { "field": "mail_id", "value": mail_id }, { "field": "to_id", "value": user_id } ]) # Return the mail if the query returns it, it must be one of their emails and a valid # mail id if mail_item: current_mail = Mail.Mail() current_mail.load_mail(mail_item['mail_id']) mail_body = current_mail.get_raw_mail().get_raw_mail_content() mail_body_lines = mail_body.split("\n") # Send each line of the email then the output ending string self.__process_smtp_response(211, "OK") for line in mail_body_lines: if len(line) > 0: self.__queue_smtp_response(214, line) self.__queue_smtp_response(214, "\n.\n") else: self.__process_smtp_response(550, "Could not find the mail id") # Client wants to view the mails in their mailbox elif view_command_parts[0] == "LIST": self.__main_state = "VIEW" self.__sub_state = "LIST" user_id = self.__current_session.get_user().get_user_id() mail_box_content = ["Last 20 mailbox items:", ""] mail_box_items = db_connection.do_select("mail_store", "all", "mail_id", "to_id = :to_id", [ { "field": "to_id", "value": user_id } ], "ORDER BY mail_date_sent DESC LIMIT 0, 20") for row in mail_box_items: current_mail = Mail.Mail() current_mail.load_mail(row['mail_id']) mail_box_content.append("{:<8} | {:<25} | " "{:<25} | {:<15}".format(current_mail.get_mail_id(), current_mail.get_subject(20), current_mail.get_from_user() .get_user_fullname(), current_mail.get_date_sent() ) ) if len(mail_box_content) == 2: mail_box_content[1] = "No Items." else: mail_box_content[1] = "{:<8} | {:<25} | {:<25} | {:<15}".format("ID", "Subject", "From", "Date") self.__process_smtp_response(211, "OK") for line in mail_box_content: self.__queue_smtp_response(214, line) self.__queue_smtp_response(214, "\n.\n") else: self.__process_smtp_response(500, "Invalid command given") else: self.__process_smtp_response(500, "Invalid command given") # Process the RSET command and return to the home menu state elif current_command_parts[0] == "RSET": self.__current_mail = None self.__process_smtp_response(250, "Ok") self.__main_state = "HMEN" self.__sub_state = None # Process the HELP request elif current_command_parts[0] == "HELP": help_content = "" # If no help "section" is requested get the MAIN help text if len(current_command_parts) == 1: help_content = db_connection.get_system_setting("HELPMAIN") # Set the state to help mode self.__main_state = "HELP" elif len(current_command_parts) == 2 and self.__main_state == "HELP": # The client is trying to quit the help state if current_command_parts[1] == "QUIT": self.__process_smtp_response(250, "OK") self.__main_state = "HMEN" self.__sub_state = None else: help_content = db_connection.get_system_setting("HELP" + current_command_parts[1]) else: # If the client is asking for a HELP section and we arnt in the help state they're wrong if self.__main_state != "HELP": self.__process_smtp_response(503, "Bad sequence of commands") else: self.__process_smtp_response(500, "Invalid command given") # If we have help content to actually send back the content if help_content and len(help_content) > 0: self.__process_smtp_response(211, "OK") help_content_lines = help_content.split("\n") for line in help_content_lines: self.__queue_smtp_response(214, line) self.__queue_smtp_response(214, "\n.\n") else: if self.__main_state == "HELP": self.__process_smtp_response(504, "Command parameter is not implemented") # ADMIN only commands elif current_command_parts[0] == "ADMN": # Check the logged in user is actually an admin if self.__current_session.get_user().role.upper() == "ADMIN": if len(current_command_parts) == 2: if current_command_parts[1] == "HOME": self.__main_state = "ADMN" self.__process_smtp_response(250, "OK") # The client is trying to quit the admin menu elif current_command_parts[1] == "QUIT": self.__main_state = "HMEN" self.__sub_state = None self.__process_smtp_response(250, "OK QUIT") # Unlock a user that has been locked out elif current_command_parts[1].startswith("UNLK"): unlock_parts = current_command_parts[1].split(" ", 1) if len(unlock_parts) == 2: unlock_user = User.User(unlock_parts[1]) if unlock_user.valid_user(): unlock_user.unlock() self.__process_smtp_response(250, "OK DONE") else: self.__process_smtp_response(500, "User not found.") else: self.__process_smtp_response(500, "Invalid command given") # Command to return the last 20 items from the server logs audit elif current_command_parts[1].startswith("AUDT"): server_log_content = ["Last 20 server actions:", ""] server_log_items = db_connection.do_select("server_logs", "all", "log_id", "", [], "ORDER BY log_date DESC " "LIMIT 0, 20") for row in server_log_items: current_log = ServerLog.ServerLog(row['log_id']) server_log_content.append(current_log.output_log_line("{:<14} | {:<14} " "| {:<15} | {:<3} " "| {:<30} ")) if len(server_log_content) == 2: server_log_content[1] = "No Items." else: server_log_content[1] = "{:<14} | " \ "{:<14} | " \ "{:<15} | " \ "{:<3} | " \ "{:<30} ".format("Date", "User", "IP", "Dir", "Action") self.__process_smtp_response(211, "OK") for line in server_log_content: self.__queue_smtp_response(214, line) self.__queue_smtp_response(214, "\n.\n") else: self.__process_smtp_response(500, "Invalid command given") else: self.__process_smtp_response(535, "No access") # The client wants to quit, send the goodbye response elif current_command_parts[0] == "QUIT": self.__process_smtp_response(221, "Bye") print(f"Client {repr(self.__address[0])} has quit...") else: self.__process_smtp_response(500, "Invalid command given") except Exception: # Catch all for any errors thrown during this loop self.__process_smtp_response(500, "Invalid command given") # print(f"Error: {repr(ex)}") else: self.__process_smtp_response(500, "Invalid command given") def __close(self): """Private method to close the server thread""" print("Attempting to end the Connection...") try: self.__selector.unregister(self.__socket) self.__socket.close() print(f"Connection ended with ", self.__address) except OSError as ex: print(f"Could not close the connection: {repr(ex)}") finally: self.__socket = None self.active = False self.current_status = -1