Untitled
unknown
plain_text
2 years ago
2.7 kB
4
Indexable
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python_operator import PythonOperator from cassandra.cluster import Cluster import pandas as pd from cassandra.auth import PlainTextAuthProvider import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart def execute_cassandra_query(): auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra@123') cluster = Cluster(['172.16.177.58'], auth_provider=auth_provider) session = cluster.connect('hobs_ods_staging') cassandra_query = "SELECT * FROM hos_event_recon_summary limit 10" result_set = session.execute(cassandra_query) df = pd.DataFrame(result_set, columns=['recon_start_date', 'recon_end_date', 'event_type', 'no_of_failed_events', 'no_of_failed_events_pubished_to_kafka', 'no_of_passed_events', 'no_of_passed_events_via_recon', 'total_no_of_events']) df['recon_start_date'] = pd.to_datetime(df['recon_start_date']).dt.date result_df = df.groupby(['recon_start_date', 'event_type']).agg({ 'total_no_of_events': 'sum', 'no_of_passed_events': 'sum', 'no_of_failed_events': 'sum', 'no_of_passed_events_via_recon': 'sum' }).reset_index() msg = MIMEMultipart() msg['From'] = 'hobapp_tcs-hob-sir-env01@tcs.com' # Update the list of recipients recipients = ['veerendrakumar.meka@tcs.com', 'another.email@example.com', 'yetanother.email@example.com'] # Set multiple recipients in the 'To' field msg['To'] = ', '.join(recipients) msg['Subject'] = 'Cassandra Query Results' html = result_df.to_html(index=False) body = MIMEText(html, 'html') msg.attach(body) smtp_server = '10.16.16.90' smtp_port = 25 with smtplib.SMTP(smtp_server, smtp_port) as server: server.sendmail(msg['From'], recipients, msg.as_string()) # Define the default_args dictionary to specify the default parameters of the DAG default_args = { 'owner': 'airflow', 'start_date': datetime(2023, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=5), } # Instantiate the DAG dag = DAG( 'cassandra_query_dag', default_args=default_args, description='DAG for executing Cassandra query and sending results via email', schedule_interval=timedelta(days=1), # Adjust the schedule interval as needed ) # Define the task using PythonOperator execute_query_task = PythonOperator( task_id='execute_cassandra_query_task', python_callable=execute_cassandra_query, dag=dag, ) # Set the task dependencies execute_query_task if __name__ == "__main__": dag.cli()
Editor is loading...
Leave a Comment