Untitled
unknown
plain_text
2 years ago
2.7 kB
5
Indexable
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python_operator import PythonOperator from cassandra.cluster import Cluster from airflow.hooks.base import BaseHook import pandas as pd from cassandra.auth import PlainTextAuthProvider import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart def process_recon_results(**kwargs): recon_start_date = kwargs['recon_start_date'] recon_end_date = kwargs['recon_end_date'] cassandra_conn_id = 'cassandrahost' cassandra_conn = BaseHook.get_connection(cassandra_conn_id) auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra@123') cluster = Cluster([cassandra_conn.host], auth_provider=auth_provider) session = cluster.connect(cassandra_conn.schema) cassandra_query = f"SELECT * FROM hos_event_recon_summary where recon_start_date>='{recon_start_date}' and recon_end_date<='{recon_end_date}' allow filtering" print(cassandra_query) result_set = session.execute(cassandra_query) df = pd.DataFrame(result_set, columns=['recon_start_date', 'recon_end_date', 'event_type', 'no_of_failed_events', 'no_of_failed_events_published_to_kafka', 'no_of_passed_events', 'no_of_passed_events_via_recon', 'total_no_of_events']) df['recon_start_date'] = pd.to_datetime(df['recon_start_date']).dt.date result_df = df.groupby(['recon_start_date', 'event_type']).agg({ 'total_no_of_events': 'sum', 'no_of_passed_events': 'sum', 'no_of_failed_events': 'sum', 'no_of_passed_events_via_recon': 'sum' }).reset_index() msg = MIMEMultipart() msg['From'] = 'hobapp_tcs-hob-sir-env01@tcs.com' msg['To'] = ', '.join(['veerendrakumar.meka@tcs.com', 'ragulp.r@tcs.com']) msg['Subject'] = 'Recon stats' html = result_df.to_html(index=False) body = MIMEText(html, 'html') msg.attach(body) smtp_server = '10.16.16.90' smtp_port = 25 with smtplib.SMTP(smtp_server, smtp_port) as server: server.sendmail(msg['From'], msg['To'], msg.as_string()) print("Mail Sent") default_args = { 'owner': 'airflow', 'start_date': datetime(2023, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=5), 'recon_start_date': None, 'recon_end_date': None } dag = DAG( 'process_recon_results', default_args=default_args, description='DAG for executing Cassandra query which process the recon stats', schedule_interval=timedelta(days=1), catchup=False, ) execute_query_task = PythonOperator( task_id='process_recon_results', python_callable=process_recon_results, dag=dag, ) execute_query_task
Editor is loading...
Leave a Comment