Untitled
unknown
plain_text
2 years ago
4.8 kB
4
Indexable
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python_operator import PythonOperator from cassandra.cluster import Cluster from airflow.hooks.base import BaseHook import pandas as pd from cassandra.auth import PlainTextAuthProvider import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from airflow.models import Variable from airflow.utils.dates import days_ago def process_recon_results(**kwargs): dag_run = kwargs['dag_run'] dag_name = dag_run.dag_id print(dag_name) adminVariableValue={"recon_start_date": "2023-05-05", "recon_end_date": "2023-05-05"} startDate=None endDate=None if kwargs['dag_run'].conf: adminVariableValue = kwargs['dag_run'].conf print('Airflow conf variable value assigned: %s, %s' % (adminVariableValue, type(adminVariableValue))) elif Variable.get(dag_name, None): adminVariableValue = Variable.get(dagIdVar, None) adminVariableValue = json.loads(adminVariableValue) print('Airflow UI variable value assigned: %s %s' % (adminVariableValue, type(adminVariableValue))) elif configProperties.has_option("shellConfiguration", "adminVariableValue"): adminVariableValue = configProperties.get('shellConfiguration', 'adminVariableValue', raw=True, fallback=None) adminVariableValue = json.loads(adminVariableValue) print('Airflow configuration value assigned: %s %s' % (adminVariableValue, type(adminVariableValue))) else: print('Airflow default values assigned: %s %s' % (adminVariableValue, type(adminVariableValue))) if adminVariableValue and isinstance(adminVariableValue, dict): if 'startDate' in adminVariableValue.keys(): startDate = adminVariableValue['startDate'] if 'endDate' in adminVariableValue.keys(): endDate = adminVariableValue['endDate'] else: raise AirflowException("ERROR: PARAMTER VALUES ERROR. %s, %s" % (adminVariableValue, type(adminVariableValue))) print("start_date: %s" % startDate) print("end_date : %s" % endDate) cassandra_conn_id='cassandrahost' cassandra_conn=BaseHook.get_connection(cassandra_conn_id) auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra@123') cluster = Cluster([cassandra_conn.host], auth_provider=auth_provider) session = cluster.connect(cassandra_conn.schema) cassandra_query = f"SELECT * FROM hos_event_recon_summary where recon_start_date>='{startDate}' and recon_end_date<='{endDate}' allow filtering" print(cassandra_query) result_set = session.execute(cassandra_query) df = pd.DataFrame(result_set, columns=['recon_start_date', 'recon_end_date', 'event_type', 'no_of_failed_events', 'no_of_failed_events_pubished_to_kafka', 'no_of_passed_events', 'no_of_passed_events_via_recon', 'total_no_of_events']) df['recon_start_date'] = pd.to_datetime(df['recon_start_date']).dt.date result_df = df.groupby(['recon_start_date', 'event_type']).agg({ 'total_no_of_events': 'sum', 'no_of_passed_events': 'sum', 'no_of_failed_events': 'sum', 'no_of_passed_events_via_recon': 'sum' }).reset_index() msg = MIMEMultipart() msg['From'] = 'hobapp_tcs-hob-sir-env01@tcs.com' recipients = ['veerendrakumar.meka@tcs.com', 'ragulp.r@tcs.com'] msg['To'] = ', '.join(recipients) msg['Subject'] = 'Recon stats' html = result_df.to_html(index=False) body = MIMEText(html, 'html') msg.attach(body) smtp_server = '10.16.16.90' smtp_port = 25 with smtplib.SMTP(smtp_server, smtp_port) as server: server.sendmail(msg['From'], recipients, msg.as_string()) print("Mail Sent") default_args = {'owner': 'devops' ,'depends_on_past': False ,'start_date': days_ago(2) } dag = DAG("process_recon" ,default_args=default_args ,description="recon stats" ,schedule_interval="@once" ,catchup=False ,tags=["devops"] ,params={"startDate":"2023-05-05", "endDate":"2023-05-05"} ) execute_query_task = PythonOperator( task_id='process_recon_results', python_callable=process_recon_results, dag=dag, ) execute_query_task can you please modify above code like we have startDate and endDate as none instead none can you dynamically make that current date-1 for example today date is 2023-12-20 so the startdate and end date should be 2023-12-19
Editor is loading...
Leave a Comment