Untitled

 avatar
unknown
plain_text
6 months ago
5.3 kB
10
Indexable
import smtplib
import ssl
from concurrent.futures import ThreadPoolExecutor, as_completed
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate, make_msgid

from loguru import logger

SEND_COUNT = int(1)

DEBUG = input("Debug mode? (y/n): ")
if DEBUG.lower() == "y":
    DEBUG = True
else:
    DEBUG = False
#### CONFIG ####

import configparser

config = configparser.ConfigParser(allow_no_value=True)
if not DEBUG:
    config.read('config.ini')
else:
    config.read('config.test.ini')
### END CONFIG #####


#### PROXY ####
import socket

socket.gethostbyname("")


#### END PROXY CONF ####

# Load email addresses from a text file
def load_emails(file_path):
    with open(file_path, 'r') as file:
        emails = file.read().splitlines()
    return emails


# Load email body from a text file
def load_text_body(file_path):
    with open(file_path, 'r') as file:
        text_body = file.read()
    return text_body


# Log email addresses to specified files
def log_email(email, log_file):
    with open(log_file, 'a') as file:
        file.write(email + '\n')


# Send a single email
def send_email(smtp_server, smtp_port, username, password, subject, text_body, html_body, from_email, from_name,
               reply_to_email,
               email):
    global SEND_COUNT
    try:
        msg = MIMEMultipart('alternative')

        msg['From'] = f'{from_name} <{from_email}>'
        msg['To'] = email
        msg['Subject'] = subject
        msg['Reply-To'] = reply_to_email  # Add Reply-To header

        # Additional headers
        msg['X-Priority'] = '1'  # 1 (Highest) to 5 (Lowest)
        msg['X-MSMail-Priority'] = 'High'  # High, Normal, or Low
        msg['Importance'] = 'High'  # High, Normal, or Low
        msg['Message-ID'] = make_msgid(domain=smtp_server)
        msg['Date'] = formatdate(localtime=True)  # Current date and time in the correct format

        # Attach both plain text and HTML parts to the email
        part1 = MIMEText(text_body, 'plain')
        part2 = MIMEText(html_body, 'html')
        msg.attach(part1)
        msg.attach(part2)

        if smtp_port == 465:
            server = smtplib.SMTP_SSL(smtp_server, smtp_port)
        else:
            server = smtplib.SMTP(smtp_server, smtp_port)
            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
            context.set_ciphers('DEFAULT@SECLEVEL=1')
            server.starttls()  # Upgrade the connection to a secure encrypted SSL/TLS connection


        text = msg.as_string()
        server.login(username, password)
        server.sendmail(from_email, email, text)
        server.quit()

        logger.success(f" [{SEND_COUNT}] : Email sent to {email} with Message-ID: {msg['Message-ID']} on {msg['Date']}")
        SEND_COUNT = SEND_COUNT + 1
        return email, 'success'
    except smtplib.SMTPRecipientsRefused:
        logger.error(f"Failed to send email to {email}: Recipients refused")
        return email, 'bounce'
    except smtplib.SMTPException as e:
        logger.error(f"Failed to send email to {email}: {str(e)}")
        return email, 'failed'


# Send email to a list of recipients using a thread pool
def send_bulk_email(smtp_server, smtp_port, username, password, subject, text_body, html_body, from_email, from_name,
                    reply_to_email, to_emails, max_workers=10):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [
            executor.submit(send_email, smtp_server, smtp_port, username, password, subject, text_body, html_body,
                            from_email, from_name, reply_to_email, email)
            for email in to_emails
        ]
        for future in as_completed(futures):
            email, status = future.result()
            if status == 'success':
                log_email(email, config.get('data', 'emails_success'))  # Log successful email sends
            elif status == 'bounce':
                log_email(email, config.get('data', 'bounce_emails'))  # Log failed email sends
            else:
                log_email(email, config.get('data', 'failed_send'))  # Log failed email sends


# Main execution
if __name__ == "__main__":
    to_emails = load_emails(config.get('data', 'emails_target'))

    smtp_server = config.get('smtp', 'host')  # e.g., smtp.gmail.com
    smtp_port = config.getint('smtp', 'port')  # For TLS
    username = config.get('smtp', 'username')  # Your email address
    password = config.get('smtp', 'password')  # Your email password
    from_email = config.get('smtp', 'from_email')  # Your email address
    reply_to_email = config.get('smtp', 'reply_to')  # Your reply-to email address
    # Camping
    from_name = config.get('camping', 'sender_name')
    subject = config.get('camping', 'subject')

    text_file = config.get('camping', 'txt_message_file')  # Path to the text file containing the plain text email body
    html_file = config.get('camping', 'html_message_file')  # Path to the HTML file containing the HTML email body

    text_body = load_text_body(text_file)
    html_body = load_text_body(html_file)

    send_bulk_email(smtp_server, smtp_port, username, password, subject, text_body, html_body, from_email, from_name,
                    reply_to_email, to_emails, max_workers=2)
Editor is loading...
Leave a Comment