Untitled
unknown
plain_text
2 years ago
4.8 kB
9
Indexable
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from cassandra.cluster import Cluster
from airflow.hooks.base import BaseHook
import pandas as pd
from cassandra.auth import PlainTextAuthProvider
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from airflow.models import Variable
from airflow.utils.dates import days_ago
def process_recon_results(**kwargs):
dag_run = kwargs['dag_run']
dag_name = dag_run.dag_id
print(dag_name)
adminVariableValue={"recon_start_date": "2023-05-05", "recon_end_date": "2023-05-05"}
startDate=None
endDate=None
if kwargs['dag_run'].conf:
adminVariableValue = kwargs['dag_run'].conf
print('Airflow conf variable value assigned: %s, %s' % (adminVariableValue, type(adminVariableValue)))
elif Variable.get(dag_name, None):
adminVariableValue = Variable.get(dagIdVar, None)
adminVariableValue = json.loads(adminVariableValue)
print('Airflow UI variable value assigned: %s %s' % (adminVariableValue, type(adminVariableValue)))
elif configProperties.has_option("shellConfiguration", "adminVariableValue"):
adminVariableValue = configProperties.get('shellConfiguration', 'adminVariableValue', raw=True, fallback=None)
adminVariableValue = json.loads(adminVariableValue)
print('Airflow configuration value assigned: %s %s' % (adminVariableValue, type(adminVariableValue)))
else:
print('Airflow default values assigned: %s %s' % (adminVariableValue, type(adminVariableValue)))
if adminVariableValue and isinstance(adminVariableValue, dict):
if 'startDate' in adminVariableValue.keys():
startDate = adminVariableValue['startDate']
if 'endDate' in adminVariableValue.keys():
endDate = adminVariableValue['endDate']
else:
raise AirflowException("ERROR: PARAMTER VALUES ERROR. %s, %s" % (adminVariableValue, type(adminVariableValue)))
print("start_date: %s" % startDate)
print("end_date : %s" % endDate)
cassandra_conn_id='cassandrahost'
cassandra_conn=BaseHook.get_connection(cassandra_conn_id)
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra@123')
cluster = Cluster([cassandra_conn.host], auth_provider=auth_provider)
session = cluster.connect(cassandra_conn.schema)
cassandra_query = f"SELECT * FROM hos_event_recon_summary where recon_start_date>='{startDate}' and recon_end_date<='{endDate}' allow filtering"
print(cassandra_query)
result_set = session.execute(cassandra_query)
df = pd.DataFrame(result_set, columns=['recon_start_date', 'recon_end_date', 'event_type', 'no_of_failed_events',
'no_of_failed_events_pubished_to_kafka', 'no_of_passed_events',
'no_of_passed_events_via_recon', 'total_no_of_events'])
df['recon_start_date'] = pd.to_datetime(df['recon_start_date']).dt.date
result_df = df.groupby(['recon_start_date', 'event_type']).agg({
'total_no_of_events': 'sum',
'no_of_passed_events': 'sum',
'no_of_failed_events': 'sum',
'no_of_passed_events_via_recon': 'sum'
}).reset_index()
msg = MIMEMultipart()
msg['From'] = 'hobapp_tcs-hob-sir-env01@tcs.com'
recipients = ['veerendrakumar.meka@tcs.com', 'ragulp.r@tcs.com']
msg['To'] = ', '.join(recipients)
msg['Subject'] = 'Recon stats'
html = result_df.to_html(index=False)
body = MIMEText(html, 'html')
msg.attach(body)
smtp_server = '10.16.16.90'
smtp_port = 25
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.sendmail(msg['From'], recipients, msg.as_string())
print("Mail Sent")
default_args = {'owner': 'devops'
,'depends_on_past': False
,'start_date': days_ago(2)
}
dag = DAG("process_recon"
,default_args=default_args
,description="recon stats"
,schedule_interval="@once"
,catchup=False
,tags=["devops"]
,params={"startDate":"2023-05-05", "endDate":"2023-05-05"}
)
execute_query_task = PythonOperator(
task_id='process_recon_results',
python_callable=process_recon_results,
dag=dag,
)
execute_query_task
can you please modify above code like we have startDate and endDate as none instead none can you dynamically make that current date-1 for example today date is 2023-12-20 so the startdate and end date should be 2023-12-19Editor is loading...
Leave a Comment