Untitled

mail@pastecode.io avatar
unknown
plain_text
7 months ago
12 kB
1
Indexable
Never
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from cassandra.cluster import Cluster
from airflow.hooks.base import BaseHook
import pandas as pd
from cassandra.auth import PlainTextAuthProvider
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

def process_recon_results(**kwargs):
    recon_start_date = kwargs['recon_start_date']
    recon_end_date = kwargs['recon_end_date']
    cassandra_conn_id = 'cassandrahost'
    cassandra_conn = BaseHook.get_connection(cassandra_conn_id)
    auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra@123')
    cluster = Cluster([cassandra_conn.host], auth_provider=auth_provider)
    session = cluster.connect(cassandra_conn.schema)

    cassandra_query = f"SELECT * FROM hos_event_recon_summary where  recon_start_date>='{recon_start_date}' and recon_end_date<='{recon_end_date}' allow filtering"
    print(cassandra_query)
    result_set = session.execute(cassandra_query)

    df = pd.DataFrame(result_set, columns=['recon_start_date', 'recon_end_date', 'event_type', 'no_of_failed_events', 'no_of_failed_events_published_to_kafka', 'no_of_passed_events', 'no_of_passed_events_via_recon', 'total_no_of_events'])
    df['recon_start_date'] = pd.to_datetime(df['recon_start_date']).dt.date
    result_df = df.groupby(['recon_start_date', 'event_type']).agg({
        'total_no_of_events': 'sum',
        'no_of_passed_events': 'sum',
        'no_of_failed_events': 'sum',
        'no_of_passed_events_via_recon': 'sum'
    }).reset_index()

    msg = MIMEMultipart()
    msg['From'] = 'hobapp_tcs-hob-sir-env01@tcs.com'
    msg['To'] = ', '.join(['veerendrakumar.meka@tcs.com', 'ragulp.r@tcs.com'])
    msg['Subject'] = 'Recon stats'

    html = result_df.to_html(index=False)
    body = MIMEText(html, 'html')
    msg.attach(body)

    smtp_server = '10.16.16.90'
    smtp_port = 25

    with smtplib.SMTP(smtp_server, smtp_port) as server:
        server.sendmail(msg['From'], msg['To'], msg.as_string())
        print("Mail Sent")

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'recon_start_date': None,
    'recon_end_date': None
}

dag = DAG(
    'process_recon_results',
    default_args=default_args,
    description='DAG for executing Cassandra query which process the recon stats',
    schedule_interval=timedelta(days=1),
    catchup=False,
)

execute_query_task = PythonOperator(
    task_id='process_recon_results',
    python_callable=process_recon_results,
    dag=dag,
)

execute_query_task

in the above code iam not getting an option in airflow ui to pass an params while trigeering airflow but if you see below code iam getting the option can you please modify above code by taking reference of below code

from airflow import DAG
from airflow import AirflowException
from airflow.models import Variable
from airflow.models.xcom import XCom
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.edgemodifier import Label

from datetime import datetime
from sys import exc_info as sys_exc_info
from sys import stderr as sys_stderr

#*********************User Defined Functions ********************************************************************************************#

def fn_failed_params_validation(ti, **kwargs):
    from airflow.utils.state import TaskInstanceState
    dag_run = kwargs['dag_run']
    xom_key = dag_run.dag_id + '_task_validateParams'
    xom_error = ti.xcom_pull(task_ids=['task_validateParams'], key=xom_key)
    dag_run.dag.set_task_instance_state(task_id="task_validateParams"
                                       ,state=TaskInstanceState.FAILED
                                       ,run_id=dag_run.run_id
                                       )
    print("XOM_ERROR")
    print(xom_error)



def fn_triggerShell(ti, **kwargs):
    from subprocess import Popen, PIPE
    dag_run = kwargs['dag_run']
    xom_key = dag_run.dag_id + '_task_validateParams'
    bashCommand = ti.xcom_pull(task_ids=['task_validateParams'], key=xom_key)
    bashCommand = bashCommand[0]
    print("bashCommand       : %s" % bashCommand)
    
    shell = Popen(bashCommand, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
    output, err = shell.communicate()
    pid = str(shell.pid)
    print("processId : %s" % pid)
    print("returncode: %s" % shell.returncode)
    print("output    : %s" % output)
    print("err       : %s" % err)
    
    if shell.returncode == 0:
        print("Space check script completed")
    else:
        raise AirflowException("Space check script failed")
    # endIf==>_shell_returncode___



def fn_preCheckOperator(**kwargs):
    from configparser import ConfigParser as conf_ConfigParser
    from os import environ as os_environ
    from os import path as os_path
    
    # Dag Name
    print("dag_id from dag:")
    dag = kwargs['dag']
    dag_name = dag.dag_id
    print(dag_name) 
    
    print("dag_id from task instance:")
    ti = kwargs['task_instance']
    dag_name = ti.dag_id
    print(dag_name) 
    
    print("dag_id from dag_run:")
    dag_run = kwargs['dag_run']
    dag_name = dag_run.dag_id
    print(dag_name) 
    
    xom_key = dag_name + '_task_validateParams'
    xcom_value = "Success"
    validation_flag = False
    adminVariableValue = {"diskSpaceThreshold": "90", "fileSizeInMB": "1024"}
    diskSpaceThreshold = None
    fileSizeInMB = None
    bashCommand = None
    try:
        file_path = os_path.realpath(__file__).replace(os_path.basename(__file__),"")
        config_file = file_path.replace(".py",".config").replace("/dags/","/config/")
        shell_script_name = os_path.basename(__file__).replace(".py",".sh")
        
        print("file_path         : %s" % file_path)
        print("shell_script_path : %s" % shell_script_name)
        print("config_file       : %s" % config_file)
        
        # Shell script details
        if not os_path.exists(file_path+shell_script_name):
            raise Exception("ERROR: SHELL SCRIPT MISSING.")
        
        # Config file 
        if not os_path.exists(config_file):
            raise AirflowException("ERROR: CONFIGURATION FILE MISSING.")
        
        configProperties = conf_ConfigParser()
        configProperties.read(config_file)
        
        if kwargs['dag_run'].conf :
            adminVariableValue = kwargs['dag_run'].conf
            print ('Airflow conf variable value assigned: %s, %s' % (adminVariableValue, type(adminVariableValue)))
        elif Variable.get(dag_name, None):
            adminVariableValue = Variable.get(dagIdVar, None)
            adminVariableValue = json.loads(adminVariableValue)
            print ('Airflow UI variable value assigned: %s %s' % (adminVariableValue, type(adminVariableValue)))
        elif configProperties.has_option("shellConfiguration", "adminVariableValue"):
            adminVariableValue = configProperties.get('shellConfiguration', 'adminVariableValue', raw = True, fallback = None)
            adminVariableValue = json.loads(adminVariableValue)
            print ('Airflow configuration value assigned: %s %s' % (adminVariableValue, type(adminVariableValue)))
        else:
            print ('Airflow default values assigned: %s %s' % (adminVariableValue, type(adminVariableValue)))
        
        if adminVariableValue and isinstance(adminVariableValue, dict):
            if 'diskSpaceThreshold' in adminVariableValue.keys():
                diskSpaceThreshold = adminVariableValue['diskSpaceThreshold']
            if 'fileSizeInMB' in adminVariableValue.keys():
                fileSizeInMB = adminVariableValue['fileSizeInMB']
        else:
            raise AirflowException("ERROR: PARAMTER VALUES ERROR. %s, %s" % (adminVariableValue, type(adminVariableValue)))
        
        print("diskSpaceThreshold: %s" % diskSpaceThreshold)
        print("fileSizeInMB      : %s" % fileSizeInMB)
        
        bashCommand =  (shell_script_name + " " + diskSpaceThreshold + " " + str(fileSizeInMB) + " ")
        bashCommand =  "cd " + file_path + ";./" + bashCommand
        print("bashCommand       : %s" % bashCommand)
        
        xcom_value = bashCommand
        validation_flag = True
    except Exception as errrsdb:
        validation_flag = False
        exception_type, exception_object, exception_traceback = sys_exc_info()
        exception_file_name = exception_traceback.tb_frame.f_code.co_filename
        exception_line_number = exception_traceback.tb_lineno
        xcom_value = ("%s|ERROR|%s|Line No. %s|%s\n" % (datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], exception_file_name, exception_line_number, str(errrsdb)))
        print(xcom_value)
    
    ti.xcom_push(key=xom_key, value=xcom_value)
    
    if validation_flag:
        return "task_triggerShellScript"
    else:
        return "task_validationFailed"



#*********************DAG Defnition ********************************************************************************************#

default_args = {'owner': 'devops'
               ,'depends_on_past': False
               ,'start_date': days_ago(2)
               }

dag = DAG("devOps_diskSpaceChecker"
         ,default_args=default_args
         ,description="devOps_diskSpaceChecker"
         ,schedule_interval="30 3,11,19 * * *"
         ,catchup=False
         ,tags=["devops"]
         ,params={"diskSpaceThreshold":"72", "fileSizeInMB":"500"}
         )

#*********************DAG Tasks********************************************************************************************#

beginProcess = DummyOperator(task_id = 'task_beginProcess'
                            ,dag = dag
                            )


validationFailedOperator = PythonOperator(task_id='task_validationFailed'
                                         ,python_callable = fn_failed_params_validation
                                         ,dag=dag
                                         ,do_xcom_push=True
                                         )


preCheckOperator = BranchPythonOperator(task_id = 'task_validateParams'
                                       ,python_callable = fn_preCheckOperator
                                       ,provide_context = True
                                       ,dag=dag
                                       )


triggerShellOperator = PythonOperator(task_id='task_triggerShellScript'
                                     ,python_callable = fn_triggerShell
                                     ,dag=dag
                                     ,do_xcom_push=True
                                     )

#*********************DAG Sequence*******************************************************************************************#

beginProcess >> Label("Validate  for configuration file, shell script, arguments") >> preCheckOperator
preCheckOperator >> Label("Validation failed") >> validationFailedOperator
preCheckOperator >> Label("Validation success") >> triggerShellOperator
Leave a Comment