Untitled

mail@pastecode.io avatar
unknown
plain_text
7 months ago
2.7 kB
1
Indexable
Never
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from cassandra.cluster import Cluster
from airflow.hooks.base import BaseHook
import pandas as pd
from cassandra.auth import PlainTextAuthProvider
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

def process_recon_results(**kwargs):
    recon_start_date = kwargs['recon_start_date']
    recon_end_date = kwargs['recon_end_date']
    cassandra_conn_id = 'cassandrahost'
    cassandra_conn = BaseHook.get_connection(cassandra_conn_id)
    auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra@123')
    cluster = Cluster([cassandra_conn.host], auth_provider=auth_provider)
    session = cluster.connect(cassandra_conn.schema)

    cassandra_query = f"SELECT * FROM hos_event_recon_summary where  recon_start_date>='{recon_start_date}' and recon_end_date<='{recon_end_date}' allow filtering"
    print(cassandra_query)
    result_set = session.execute(cassandra_query)

    df = pd.DataFrame(result_set, columns=['recon_start_date', 'recon_end_date', 'event_type', 'no_of_failed_events', 'no_of_failed_events_published_to_kafka', 'no_of_passed_events', 'no_of_passed_events_via_recon', 'total_no_of_events'])
    df['recon_start_date'] = pd.to_datetime(df['recon_start_date']).dt.date
    result_df = df.groupby(['recon_start_date', 'event_type']).agg({
        'total_no_of_events': 'sum',
        'no_of_passed_events': 'sum',
        'no_of_failed_events': 'sum',
        'no_of_passed_events_via_recon': 'sum'
    }).reset_index()

    msg = MIMEMultipart()
    msg['From'] = 'hobapp_tcs-hob-sir-env01@tcs.com'
    msg['To'] = ', '.join(['veerendrakumar.meka@tcs.com', 'ragulp.r@tcs.com'])
    msg['Subject'] = 'Recon stats'

    html = result_df.to_html(index=False)
    body = MIMEText(html, 'html')
    msg.attach(body)

    smtp_server = '10.16.16.90'
    smtp_port = 25

    with smtplib.SMTP(smtp_server, smtp_port) as server:
        server.sendmail(msg['From'], msg['To'], msg.as_string())
        print("Mail Sent")

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'recon_start_date': None,
    'recon_end_date': None
}

dag = DAG(
    'process_recon_results',
    default_args=default_args,
    description='DAG for executing Cassandra query which process the recon stats',
    schedule_interval=timedelta(days=1),
    catchup=False,
)

execute_query_task = PythonOperator(
    task_id='process_recon_results',
    python_callable=process_recon_results,
    dag=dag,
)

execute_query_task
Leave a Comment