Untitled
unknown
plain_text
6 months ago
5.3 kB
10
Indexable
import smtplib import ssl from concurrent.futures import ThreadPoolExecutor, as_completed from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.utils import formatdate, make_msgid from loguru import logger SEND_COUNT = int(1) DEBUG = input("Debug mode? (y/n): ") if DEBUG.lower() == "y": DEBUG = True else: DEBUG = False #### CONFIG #### import configparser config = configparser.ConfigParser(allow_no_value=True) if not DEBUG: config.read('config.ini') else: config.read('config.test.ini') ### END CONFIG ##### #### PROXY #### import socket socket.gethostbyname("") #### END PROXY CONF #### # Load email addresses from a text file def load_emails(file_path): with open(file_path, 'r') as file: emails = file.read().splitlines() return emails # Load email body from a text file def load_text_body(file_path): with open(file_path, 'r') as file: text_body = file.read() return text_body # Log email addresses to specified files def log_email(email, log_file): with open(log_file, 'a') as file: file.write(email + '\n') # Send a single email def send_email(smtp_server, smtp_port, username, password, subject, text_body, html_body, from_email, from_name, reply_to_email, email): global SEND_COUNT try: msg = MIMEMultipart('alternative') msg['From'] = f'{from_name} <{from_email}>' msg['To'] = email msg['Subject'] = subject msg['Reply-To'] = reply_to_email # Add Reply-To header # Additional headers msg['X-Priority'] = '1' # 1 (Highest) to 5 (Lowest) msg['X-MSMail-Priority'] = 'High' # High, Normal, or Low msg['Importance'] = 'High' # High, Normal, or Low msg['Message-ID'] = make_msgid(domain=smtp_server) msg['Date'] = formatdate(localtime=True) # Current date and time in the correct format # Attach both plain text and HTML parts to the email part1 = MIMEText(text_body, 'plain') part2 = MIMEText(html_body, 'html') msg.attach(part1) msg.attach(part2) if smtp_port == 465: server = smtplib.SMTP_SSL(smtp_server, smtp_port) else: server = smtplib.SMTP(smtp_server, smtp_port) context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) context.set_ciphers('DEFAULT@SECLEVEL=1') server.starttls() # Upgrade the connection to a secure encrypted SSL/TLS connection text = msg.as_string() server.login(username, password) server.sendmail(from_email, email, text) server.quit() logger.success(f" [{SEND_COUNT}] : Email sent to {email} with Message-ID: {msg['Message-ID']} on {msg['Date']}") SEND_COUNT = SEND_COUNT + 1 return email, 'success' except smtplib.SMTPRecipientsRefused: logger.error(f"Failed to send email to {email}: Recipients refused") return email, 'bounce' except smtplib.SMTPException as e: logger.error(f"Failed to send email to {email}: {str(e)}") return email, 'failed' # Send email to a list of recipients using a thread pool def send_bulk_email(smtp_server, smtp_port, username, password, subject, text_body, html_body, from_email, from_name, reply_to_email, to_emails, max_workers=10): with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [ executor.submit(send_email, smtp_server, smtp_port, username, password, subject, text_body, html_body, from_email, from_name, reply_to_email, email) for email in to_emails ] for future in as_completed(futures): email, status = future.result() if status == 'success': log_email(email, config.get('data', 'emails_success')) # Log successful email sends elif status == 'bounce': log_email(email, config.get('data', 'bounce_emails')) # Log failed email sends else: log_email(email, config.get('data', 'failed_send')) # Log failed email sends # Main execution if __name__ == "__main__": to_emails = load_emails(config.get('data', 'emails_target')) smtp_server = config.get('smtp', 'host') # e.g., smtp.gmail.com smtp_port = config.getint('smtp', 'port') # For TLS username = config.get('smtp', 'username') # Your email address password = config.get('smtp', 'password') # Your email password from_email = config.get('smtp', 'from_email') # Your email address reply_to_email = config.get('smtp', 'reply_to') # Your reply-to email address # Camping from_name = config.get('camping', 'sender_name') subject = config.get('camping', 'subject') text_file = config.get('camping', 'txt_message_file') # Path to the text file containing the plain text email body html_file = config.get('camping', 'html_message_file') # Path to the HTML file containing the HTML email body text_body = load_text_body(text_file) html_body = load_text_body(html_file) send_bulk_email(smtp_server, smtp_port, username, password, subject, text_body, html_body, from_email, from_name, reply_to_email, to_emails, max_workers=2)
Editor is loading...
Leave a Comment