302 lines
16 KiB
Python
302 lines
16 KiB
Python
import SMTPClient
|
|
import socket
|
|
import time
|
|
|
|
default_host = "127.0.0.1"
|
|
default_port = 50000
|
|
host = default_host
|
|
port = default_port
|
|
quit_flag = False
|
|
# Variable to setting to print the title to sections only once
|
|
current_state_initialised = False
|
|
|
|
|
|
# Handy function to draw the title block for each section when needed
|
|
def print_title(title: str) -> None:
|
|
title_bar_char = '-'
|
|
title_bar = "".join(title_bar_char * (len(title) + 2))
|
|
print(f"+{title_bar}+\n| {title} |\n+{title_bar}+")
|
|
|
|
|
|
# Handy function for dealing with Quit commands for the script
|
|
def do_a_quit(message: str) -> bool:
|
|
print(message)
|
|
selected_option = input("[Y]es/[N]o: ").strip().upper()
|
|
|
|
if selected_option == "Y" or selected_option == "YES":
|
|
result = False
|
|
else:
|
|
# quit the program loop
|
|
result = True
|
|
|
|
return result
|
|
|
|
|
|
# start main program loop
|
|
if __name__ == "__main__":
|
|
while not quit_flag:
|
|
# ask user for a hostname and port number, or leave blank to use the default values
|
|
print(f"Please enter a valid host address and port: (default: {default_host}:{default_port})")
|
|
host_input = input("host:port, blank for default:")
|
|
|
|
valid_host = True
|
|
test_host = ""
|
|
test_port = ""
|
|
|
|
# check if a value has been entered or to use the default values
|
|
if len(host_input.strip()) > 0:
|
|
# strip any trailing/leading whitespace and split by the colon
|
|
host_input = host_input.strip()
|
|
host_parts = host_input.split(':', 1)
|
|
|
|
# check if the host_parts array has 2 values, these should be a host ip and port
|
|
if len(host_parts) == 2:
|
|
try:
|
|
# convert the first item in host_parts into a valid IP address if host name is given
|
|
test_host = socket.gethostbyname(host_parts[0])
|
|
# check if the second item in host_parts is numeric for the port number
|
|
if host_parts[1].isnumeric():
|
|
test_port = int(host_parts[1])
|
|
else:
|
|
valid_host = False
|
|
except socket.gaierror:
|
|
valid_host = False
|
|
else:
|
|
valid_host = False
|
|
# if no value is given then use the default host and port given at the top
|
|
else:
|
|
# ensure the default host and port are assigned to the test values since they do not need to be
|
|
# checked/sanitized
|
|
test_host = default_host
|
|
test_port = default_port
|
|
# check if the host is still valid
|
|
if valid_host:
|
|
# set the actual host and port variables
|
|
host = test_host
|
|
port = test_port
|
|
|
|
print(f"Connecting to {host}:{port}...")
|
|
|
|
quit_input_flag = False
|
|
# Start the client with the given host and port
|
|
smtp_client = SMTPClient.SMTPClient(host, port)
|
|
|
|
# Attempt to connect 3 times before giving up
|
|
for i in range(3):
|
|
if smtp_client.start_connection():
|
|
break
|
|
else:
|
|
print("Connection failed, attempting again in 5 seconds")
|
|
time.sleep(5)
|
|
|
|
if smtp_client.is_connected():
|
|
# The main client loop
|
|
while smtp_client.is_active() and not quit_flag:
|
|
|
|
# We wait for the current status to be in "write" mode
|
|
if smtp_client.current_status() == -1:
|
|
# Print the main menu if we are in the HMEN (home menu)
|
|
if smtp_client.get_state() == "HMEN":
|
|
if not current_state_initialised:
|
|
print_title("SMTP server home menu")
|
|
print(f"Welcome {smtp_client.get_current_users_name()}.")
|
|
print(f"Email: {smtp_client.get_current_users_email_address()}")
|
|
print("[1]: [C]ompose an Email")
|
|
print("[2]: [V]iew your Mailbox")
|
|
print("[3]: [H]elp")
|
|
print("[4]: [L]og Out")
|
|
print("[5]: [Q]uit")
|
|
# Secret admin menu option if you are on an Admin account
|
|
if smtp_client.get_users_role() == "ADMIN":
|
|
print("[6]: [A]dmin console")
|
|
current_state_initialised = True
|
|
|
|
# If we have any response messages waiting, print them
|
|
if len(smtp_client.get_current_response_content()) > 0:
|
|
print("\nLast Status Message:")
|
|
print("----------------------")
|
|
for line in smtp_client.get_current_response_content():
|
|
print(line)
|
|
|
|
current_command = input("Option: ")
|
|
if current_command in ["1", "C", "c"]:
|
|
current_state_initialised = False
|
|
smtp_client.run_command("MAIL")
|
|
elif current_command in ["2", "V", "v"]:
|
|
current_state_initialised = False
|
|
smtp_client.run_command("VIEW")
|
|
elif current_command in ["3", "H", "h"]:
|
|
current_state_initialised = False
|
|
smtp_client.run_command("HELP")
|
|
elif current_command in ["4", "L", "l"]:
|
|
current_state_initialised = False
|
|
smtp_client.run_command("LOUT")
|
|
elif current_command in ["5", "Q", "q"]:
|
|
current_state_initialised = False
|
|
smtp_client.run_command("QUIT")
|
|
elif current_command in ["6", "A", "a"] and smtp_client.get_users_role() == "ADMIN":
|
|
current_state_initialised = False
|
|
smtp_client.run_command("ADMN")
|
|
else:
|
|
print("Invalid command, try again.")
|
|
|
|
# The admin menu allows for some specific Admin commands
|
|
elif smtp_client.get_state() == "ADMN":
|
|
print_title("Secret Admin Menu")
|
|
|
|
admin_content = smtp_client.get_current_response_content()
|
|
if len(admin_content) > 0:
|
|
print("SERVER RESPONDED: ")
|
|
for admin_line in admin_content:
|
|
print(admin_line)
|
|
print("END\n")
|
|
|
|
print("Admin Commands:")
|
|
print(">UNLK 'username' - Unlock a User account.")
|
|
print(">AUDT - View last 20 server actions")
|
|
print(">QUIT - Back to the Home menu")
|
|
|
|
current_command = input("Option: ")
|
|
if len(current_command) > 0:
|
|
smtp_client.run_command("ADMN", {"admin_command": current_command})
|
|
|
|
# Login details entry
|
|
elif smtp_client.get_state() == "LOGI":
|
|
print_title("Login Details")
|
|
print("Please enter your Login details:")
|
|
username = input("Username: ")
|
|
password = input("Password: ")
|
|
login_details = {"username": username, "password": password}
|
|
if not smtp_client.run_command("LOGI", login_details):
|
|
if do_a_quit("You did not enter a valid username and/or password, try again?"):
|
|
# send the quit command to the server
|
|
smtp_client.run_command("QUIT")
|
|
quit_flag = True
|
|
|
|
# Recipient email address entry, this continues til an empty value is entered
|
|
elif smtp_client.get_state() == "RCPT":
|
|
if not current_state_initialised:
|
|
print_title("Composing an Email: Recipients")
|
|
print("Input \">QUIT\" at any point to quit back to the menu.")
|
|
print("Input a recipient Email address and press enter, "
|
|
"when you are done press enter again.")
|
|
current_state_initialised = True
|
|
|
|
current_recp_address = input("TO: ")
|
|
if len(current_recp_address) > 0:
|
|
# Quit the compose email loop and send the RSET command
|
|
if current_recp_address.upper() == ">QUIT":
|
|
current_state_initialised = False
|
|
smtp_client.run_command("RSET")
|
|
else:
|
|
if not smtp_client.run_command("RCPT", {'to_address': current_recp_address}):
|
|
print("You must enter a unique recipient address.")
|
|
else:
|
|
# Continue to the Subject entry state if blank
|
|
if smtp_client.run_command("SUBJ"):
|
|
current_state_initialised = False
|
|
else:
|
|
# If a false was returned then they did not enter at least 1 address
|
|
print("You must have at least 1 recipient")
|
|
|
|
# Subject entery state, its not an official state since there is no real "SUBJ" command
|
|
elif smtp_client.get_state() == "SUBJ":
|
|
if not current_state_initialised:
|
|
print_title("Composing an Email: Subject")
|
|
print("Input \">QUIT\" at any point to quit back to the menu.")
|
|
print("Enter a Subject for this Email")
|
|
current_state_initialised = True
|
|
|
|
current_subject = input("SUBJECT: ")
|
|
# Quit the compose email loop and send the RSET command
|
|
if current_subject.upper() == ">QUIT":
|
|
current_state_initialised = False
|
|
smtp_client.run_command("RSET")
|
|
else:
|
|
if len(current_subject) == 0:
|
|
current_subject = "No Subject"
|
|
current_state_initialised = False
|
|
smtp_client.run_command("DATA", {"subject": current_subject})
|
|
|
|
# Enter the actual email body data entry mode, this will not end until you >QUIT or >SEND
|
|
elif smtp_client.get_state() == "DATA":
|
|
if not current_state_initialised:
|
|
print_title("Composing an Email: Body")
|
|
print("Input \">QUIT\" at any point to quit back to the menu.")
|
|
print("Input your email text, press enter to start a new line,")
|
|
print("Input \">SEND\" when you have finished your Email.")
|
|
current_state_initialised = True
|
|
|
|
current_mail_line = input("DATA: ")
|
|
# Quit the compose email loop and send the RSET command
|
|
if current_mail_line.upper() == ">QUIT":
|
|
current_state_initialised = False
|
|
smtp_client.run_command("RSET")
|
|
# Sends the \n.\n line to let the server know our data entry has ended
|
|
elif current_mail_line.upper() == ">SEND":
|
|
current_state_initialised = False
|
|
smtp_client.run_command("SEND", {'data_line': "\n.\n"})
|
|
else:
|
|
# Each line is sent individually when enter is pressed and stored on the server
|
|
smtp_client.run_command("LINE", {'data_line': current_mail_line})
|
|
|
|
# Output the mailbox or actual mail response from the server when in the VIEW state
|
|
elif smtp_client.get_state() == "VIEW":
|
|
print_title("Viewing Mail")
|
|
view_content = smtp_client.get_current_response_content()
|
|
for view_line in view_content:
|
|
print(view_line)
|
|
print("End of request, Enter a mail id, >LIST to view latest emails"
|
|
" or >QUIT to return to the menu.")
|
|
|
|
view_command = input("VIEW ")
|
|
smtp_client.run_command("VIEW", {"mail_id": view_command})
|
|
|
|
# Output the help response when the client is in the HELP state
|
|
elif smtp_client.get_state() == "HELP":
|
|
print_title("SMTP Server Help")
|
|
help_content = smtp_client.get_current_response_content()
|
|
for help_line in help_content:
|
|
print(help_line)
|
|
print("End of file, Enter a help subsection or >QUIT to return to the menu.")
|
|
|
|
help_command = input("HELP ")
|
|
smtp_client.run_command("HELP", {"help_section": help_command})
|
|
|
|
# QUIT should have already quit but if not we just pass
|
|
elif smtp_client.get_state() == "QUIT":
|
|
pass
|
|
|
|
# Catch all for any commands, if we end here we've done something wrong
|
|
# Kept here to help debug if we do actually run into an issue
|
|
else:
|
|
current_command = input(f"{smtp_client.get_state()}: ")
|
|
smtp_client.run_command(current_command)
|
|
|
|
# If the client loses its active flag then the connection is no longer connected
|
|
print("Connection no longer active, exiting")
|
|
quit_flag = True
|
|
else:
|
|
if not do_a_quit("The connection could not be made, would you like to try another host?"):
|
|
# reset the host and port values to the defaults
|
|
host = default_host
|
|
port = default_port
|
|
give_up_flag = ""
|
|
else:
|
|
# quit the program loop
|
|
quit_flag = True
|
|
|
|
# the host given was invalid, ask if the user would like to try again
|
|
else:
|
|
if not do_a_quit("You did not enter a valid host, try again?"):
|
|
# reset the host and port values to the defaults
|
|
host = default_host
|
|
port = default_port
|
|
give_up_flag = ""
|
|
else:
|
|
# quit the program loop
|
|
quit_flag = True
|
|
|
|
# exit the program and say goodbye
|
|
print("Program ending, thank you")
|