Untitled
unknown
plain_text
a year ago
5.3 kB
20
Indexable
import smtplib
import ssl
from concurrent.futures import ThreadPoolExecutor, as_completed
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate, make_msgid
from loguru import logger
SEND_COUNT = int(1)
DEBUG = input("Debug mode? (y/n): ")
if DEBUG.lower() == "y":
DEBUG = True
else:
DEBUG = False
#### CONFIG ####
import configparser
config = configparser.ConfigParser(allow_no_value=True)
if not DEBUG:
config.read('config.ini')
else:
config.read('config.test.ini')
### END CONFIG #####
#### PROXY ####
import socket
socket.gethostbyname("")
#### END PROXY CONF ####
# Load email addresses from a text file
def load_emails(file_path):
with open(file_path, 'r') as file:
emails = file.read().splitlines()
return emails
# Load email body from a text file
def load_text_body(file_path):
with open(file_path, 'r') as file:
text_body = file.read()
return text_body
# Log email addresses to specified files
def log_email(email, log_file):
with open(log_file, 'a') as file:
file.write(email + '\n')
# Send a single email
def send_email(smtp_server, smtp_port, username, password, subject, text_body, html_body, from_email, from_name,
reply_to_email,
email):
global SEND_COUNT
try:
msg = MIMEMultipart('alternative')
msg['From'] = f'{from_name} <{from_email}>'
msg['To'] = email
msg['Subject'] = subject
msg['Reply-To'] = reply_to_email # Add Reply-To header
# Additional headers
msg['X-Priority'] = '1' # 1 (Highest) to 5 (Lowest)
msg['X-MSMail-Priority'] = 'High' # High, Normal, or Low
msg['Importance'] = 'High' # High, Normal, or Low
msg['Message-ID'] = make_msgid(domain=smtp_server)
msg['Date'] = formatdate(localtime=True) # Current date and time in the correct format
# Attach both plain text and HTML parts to the email
part1 = MIMEText(text_body, 'plain')
part2 = MIMEText(html_body, 'html')
msg.attach(part1)
msg.attach(part2)
if smtp_port == 465:
server = smtplib.SMTP_SSL(smtp_server, smtp_port)
else:
server = smtplib.SMTP(smtp_server, smtp_port)
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.set_ciphers('DEFAULT@SECLEVEL=1')
server.starttls() # Upgrade the connection to a secure encrypted SSL/TLS connection
text = msg.as_string()
server.login(username, password)
server.sendmail(from_email, email, text)
server.quit()
logger.success(f" [{SEND_COUNT}] : Email sent to {email} with Message-ID: {msg['Message-ID']} on {msg['Date']}")
SEND_COUNT = SEND_COUNT + 1
return email, 'success'
except smtplib.SMTPRecipientsRefused:
logger.error(f"Failed to send email to {email}: Recipients refused")
return email, 'bounce'
except smtplib.SMTPException as e:
logger.error(f"Failed to send email to {email}: {str(e)}")
return email, 'failed'
# Send email to a list of recipients using a thread pool
def send_bulk_email(smtp_server, smtp_port, username, password, subject, text_body, html_body, from_email, from_name,
reply_to_email, to_emails, max_workers=10):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(send_email, smtp_server, smtp_port, username, password, subject, text_body, html_body,
from_email, from_name, reply_to_email, email)
for email in to_emails
]
for future in as_completed(futures):
email, status = future.result()
if status == 'success':
log_email(email, config.get('data', 'emails_success')) # Log successful email sends
elif status == 'bounce':
log_email(email, config.get('data', 'bounce_emails')) # Log failed email sends
else:
log_email(email, config.get('data', 'failed_send')) # Log failed email sends
# Main execution
if __name__ == "__main__":
to_emails = load_emails(config.get('data', 'emails_target'))
smtp_server = config.get('smtp', 'host') # e.g., smtp.gmail.com
smtp_port = config.getint('smtp', 'port') # For TLS
username = config.get('smtp', 'username') # Your email address
password = config.get('smtp', 'password') # Your email password
from_email = config.get('smtp', 'from_email') # Your email address
reply_to_email = config.get('smtp', 'reply_to') # Your reply-to email address
# Camping
from_name = config.get('camping', 'sender_name')
subject = config.get('camping', 'subject')
text_file = config.get('camping', 'txt_message_file') # Path to the text file containing the plain text email body
html_file = config.get('camping', 'html_message_file') # Path to the HTML file containing the HTML email body
text_body = load_text_body(text_file)
html_body = load_text_body(html_file)
send_bulk_email(smtp_server, smtp_port, username, password, subject, text_body, html_body, from_email, from_name,
reply_to_email, to_emails, max_workers=2)
Editor is loading...
Leave a Comment