Untitled

 avatar
unknown
plain_text
2 years ago
4.8 kB
4
Indexable
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from cassandra.cluster import Cluster
from airflow.hooks.base import BaseHook
import pandas as pd
from cassandra.auth import PlainTextAuthProvider
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from airflow.models import Variable
from airflow.utils.dates import days_ago


def process_recon_results(**kwargs):
    dag_run = kwargs['dag_run']
    dag_name = dag_run.dag_id
    print(dag_name)
    adminVariableValue={"recon_start_date": "2023-05-05", "recon_end_date": "2023-05-05"}
    startDate=None
    endDate=None

    if kwargs['dag_run'].conf:
        adminVariableValue = kwargs['dag_run'].conf
        print('Airflow conf variable value assigned: %s, %s' % (adminVariableValue, type(adminVariableValue)))
    elif Variable.get(dag_name, None):
        adminVariableValue = Variable.get(dagIdVar, None)
        adminVariableValue = json.loads(adminVariableValue)
        print('Airflow UI variable value assigned: %s %s' % (adminVariableValue, type(adminVariableValue)))
    elif configProperties.has_option("shellConfiguration", "adminVariableValue"):
        adminVariableValue = configProperties.get('shellConfiguration', 'adminVariableValue', raw=True, fallback=None)
        adminVariableValue = json.loads(adminVariableValue)
        print('Airflow configuration value assigned: %s %s' % (adminVariableValue, type(adminVariableValue)))
    else:
        print('Airflow default values assigned: %s %s' % (adminVariableValue, type(adminVariableValue)))

    if adminVariableValue and isinstance(adminVariableValue, dict):
        if 'startDate' in adminVariableValue.keys():
            startDate = adminVariableValue['startDate']
        if 'endDate' in adminVariableValue.keys():
            endDate = adminVariableValue['endDate']
    else:
        raise AirflowException("ERROR: PARAMTER VALUES ERROR. %s, %s" % (adminVariableValue, type(adminVariableValue)))

    print("start_date: %s" % startDate)
    print("end_date      : %s" % endDate)

    cassandra_conn_id='cassandrahost'
    cassandra_conn=BaseHook.get_connection(cassandra_conn_id)
    auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra@123')
    cluster = Cluster([cassandra_conn.host], auth_provider=auth_provider)
    session = cluster.connect(cassandra_conn.schema)


    cassandra_query = f"SELECT * FROM hos_event_recon_summary where  recon_start_date>='{startDate}' and recon_end_date<='{endDate}' allow filtering"
    print(cassandra_query)
    result_set = session.execute(cassandra_query)

    df = pd.DataFrame(result_set, columns=['recon_start_date', 'recon_end_date', 'event_type', 'no_of_failed_events',
                                           'no_of_failed_events_pubished_to_kafka', 'no_of_passed_events',
                                           'no_of_passed_events_via_recon', 'total_no_of_events'])

    df['recon_start_date'] = pd.to_datetime(df['recon_start_date']).dt.date
    result_df = df.groupby(['recon_start_date', 'event_type']).agg({
        'total_no_of_events': 'sum',
        'no_of_passed_events': 'sum',
        'no_of_failed_events': 'sum',
        'no_of_passed_events_via_recon': 'sum'
    }).reset_index()

    msg = MIMEMultipart()
    msg['From'] = 'hobapp_tcs-hob-sir-env01@tcs.com'

    recipients = ['veerendrakumar.meka@tcs.com', 'ragulp.r@tcs.com']

    msg['To'] = ', '.join(recipients)

    msg['Subject'] = 'Recon stats'

    html = result_df.to_html(index=False)
    body = MIMEText(html, 'html')
    msg.attach(body)

    smtp_server = '10.16.16.90'
    smtp_port = 25

    with smtplib.SMTP(smtp_server, smtp_port) as server:
        server.sendmail(msg['From'], recipients, msg.as_string())
        print("Mail Sent")


default_args = {'owner': 'devops'
               ,'depends_on_past': False
               ,'start_date': days_ago(2)
               }

dag = DAG("process_recon"
         ,default_args=default_args
         ,description="recon stats"
         ,schedule_interval="@once"
         ,catchup=False
         ,tags=["devops"]
         ,params={"startDate":"2023-05-05", "endDate":"2023-05-05"}
         )


execute_query_task = PythonOperator(
    task_id='process_recon_results',
    python_callable=process_recon_results,
    dag=dag,
)

execute_query_task



can you please modify above code like we have startDate and endDate as none instead none can you dynamically make that current date-1 for example today date is 2023-12-20 so the startdate and end date should be 2023-12-19
Editor is loading...
Leave a Comment