Untitled

 avatar
unknown
plain_text
2 years ago
2.6 kB
6
Indexable
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from cassandra.cluster import Cluster
import pandas as pd
from cassandra.auth import PlainTextAuthProvider
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

def execute_cassandra_query():
    auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra@123')
    cluster = Cluster(['172.16.177.58'], auth_provider=auth_provider)
    session = cluster.connect('hobs_ods_staging')

    cassandra_query = "SELECT * FROM hos_event_recon_summary limit 10"
    result_set = session.execute(cassandra_query)

    df = pd.DataFrame(result_set, columns=['recon_start_date', 'recon_end_date', 'event_type', 'no_of_failed_events',
                                           'no_of_failed_events_pubished_to_kafka', 'no_of_passed_events',
                                           'no_of_passed_events_via_recon', 'total_no_of_events'])

    df['recon_start_date'] = pd.to_datetime(df['recon_start_date']).dt.date
    result_df = df.groupby(['recon_start_date', 'event_type']).agg({
        'total_no_of_events': 'sum',
        'no_of_passed_events': 'sum',
        'no_of_failed_events': 'sum',
        'no_of_passed_events_via_recon': 'sum'
    }).reset_index()

    msg = MIMEMultipart()
    msg['From'] = 'hobapp_tcs-hob-sir-env01@tcs.com'
    msg['To'] = 'veerendrakumar.meka@tcs.com'
    msg['Subject'] = 'Cassandra Query Results'

    html = result_df.to_html(index=False)
    body = MIMEText(html, 'html')
    msg.attach(body)

    smtp_server = '10.16.16.90'
    smtp_port = 25

    with smtplib.SMTP(smtp_server, smtp_port) as server:
        server.sendmail(msg['From'], msg['To'], msg.as_string())

# Define the default_args dictionary to specify the default parameters of the DAG
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Instantiate the DAG
dag = DAG(
    'cassandra_query_dag',
    default_args=default_args,
    description='DAG for executing Cassandra query and sending results via email',
    schedule_interval=timedelta(days=1),  # Adjust the schedule interval as needed
)

# Define the task using PythonOperator
execute_query_task = PythonOperator(
    task_id='execute_cassandra_query_task',
    python_callable=execute_cassandra_query,
    dag=dag,
)

# Set the task dependencies
execute_query_task

if __name__ == "__main__":
    dag.cli()
Editor is loading...
Leave a Comment