Untitled
unknown
plain_text
2 years ago
8.5 kB
5
Indexable
but while triggering the dag in UI iam not getting option for passing params..for example in the below code iam getting that option from airflow import DAG from airflow import AirflowException from airflow.models import Variable from airflow.models.xcom import XCom from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import BranchPythonOperator from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago from airflow.utils.edgemodifier import Label from datetime import datetime from sys import exc_info as sys_exc_info from sys import stderr as sys_stderr #*********************User Defined Functions ********************************************************************************************# def fn_failed_params_validation(ti, **kwargs): from airflow.utils.state import TaskInstanceState dag_run = kwargs['dag_run'] xom_key = dag_run.dag_id + '_task_validateParams' xom_error = ti.xcom_pull(task_ids=['task_validateParams'], key=xom_key) dag_run.dag.set_task_instance_state(task_id="task_validateParams" ,state=TaskInstanceState.FAILED ,run_id=dag_run.run_id ) print("XOM_ERROR") print(xom_error) def fn_triggerShell(ti, **kwargs): from subprocess import Popen, PIPE dag_run = kwargs['dag_run'] xom_key = dag_run.dag_id + '_task_validateParams' bashCommand = ti.xcom_pull(task_ids=['task_validateParams'], key=xom_key) bashCommand = bashCommand[0] print("bashCommand : %s" % bashCommand) shell = Popen(bashCommand, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True) output, err = shell.communicate() pid = str(shell.pid) print("processId : %s" % pid) print("returncode: %s" % shell.returncode) print("output : %s" % output) print("err : %s" % err) if shell.returncode == 0: print("Space check script completed") else: raise AirflowException("Space check script failed") # endIf==>_shell_returncode___ def fn_preCheckOperator(**kwargs): from configparser import ConfigParser as conf_ConfigParser from os import environ as os_environ from os import path as os_path # Dag Name print("dag_id from dag:") dag = kwargs['dag'] dag_name = dag.dag_id print(dag_name) print("dag_id from task instance:") ti = kwargs['task_instance'] dag_name = ti.dag_id print(dag_name) print("dag_id from dag_run:") dag_run = kwargs['dag_run'] dag_name = dag_run.dag_id print(dag_name) xom_key = dag_name + '_task_validateParams' xcom_value = "Success" validation_flag = False adminVariableValue = {"diskSpaceThreshold": "90", "fileSizeInMB": "1024"} diskSpaceThreshold = None fileSizeInMB = None bashCommand = None try: file_path = os_path.realpath(__file__).replace(os_path.basename(__file__),"") config_file = file_path.replace(".py",".config").replace("/dags/","/config/") shell_script_name = os_path.basename(__file__).replace(".py",".sh") print("file_path : %s" % file_path) print("shell_script_path : %s" % shell_script_name) print("config_file : %s" % config_file) # Shell script details if not os_path.exists(file_path+shell_script_name): raise Exception("ERROR: SHELL SCRIPT MISSING.") # Config file if not os_path.exists(config_file): raise AirflowException("ERROR: CONFIGURATION FILE MISSING.") configProperties = conf_ConfigParser() configProperties.read(config_file) if kwargs['dag_run'].conf : adminVariableValue = kwargs['dag_run'].conf print ('Airflow conf variable value assigned: %s, %s' % (adminVariableValue, type(adminVariableValue))) elif Variable.get(dag_name, None): adminVariableValue = Variable.get(dagIdVar, None) adminVariableValue = json.loads(adminVariableValue) print ('Airflow UI variable value assigned: %s %s' % (adminVariableValue, type(adminVariableValue))) elif configProperties.has_option("shellConfiguration", "adminVariableValue"): adminVariableValue = configProperties.get('shellConfiguration', 'adminVariableValue', raw = True, fallback = None) adminVariableValue = json.loads(adminVariableValue) print ('Airflow configuration value assigned: %s %s' % (adminVariableValue, type(adminVariableValue))) else: print ('Airflow default values assigned: %s %s' % (adminVariableValue, type(adminVariableValue))) if adminVariableValue and isinstance(adminVariableValue, dict): if 'diskSpaceThreshold' in adminVariableValue.keys(): diskSpaceThreshold = adminVariableValue['diskSpaceThreshold'] if 'fileSizeInMB' in adminVariableValue.keys(): fileSizeInMB = adminVariableValue['fileSizeInMB'] else: raise AirflowException("ERROR: PARAMTER VALUES ERROR. %s, %s" % (adminVariableValue, type(adminVariableValue))) print("diskSpaceThreshold: %s" % diskSpaceThreshold) print("fileSizeInMB : %s" % fileSizeInMB) bashCommand = (shell_script_name + " " + diskSpaceThreshold + " " + str(fileSizeInMB) + " ") bashCommand = "cd " + file_path + ";./" + bashCommand print("bashCommand : %s" % bashCommand) xcom_value = bashCommand validation_flag = True except Exception as errrsdb: validation_flag = False exception_type, exception_object, exception_traceback = sys_exc_info() exception_file_name = exception_traceback.tb_frame.f_code.co_filename exception_line_number = exception_traceback.tb_lineno xcom_value = ("%s|ERROR|%s|Line No. %s|%s\n" % (datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], exception_file_name, exception_line_number, str(errrsdb))) print(xcom_value) ti.xcom_push(key=xom_key, value=xcom_value) if validation_flag: return "task_triggerShellScript" else: return "task_validationFailed" #*********************DAG Defnition ********************************************************************************************# default_args = {'owner': 'devops' ,'depends_on_past': False ,'start_date': days_ago(2) } dag = DAG("devOps_diskSpaceChecker" ,default_args=default_args ,description="devOps_diskSpaceChecker" ,schedule_interval="30 3,11,19 * * *" ,catchup=False ,tags=["devops"] ,params={"diskSpaceThreshold":"72", "fileSizeInMB":"500"} ) #*********************DAG Tasks********************************************************************************************# beginProcess = DummyOperator(task_id = 'task_beginProcess' ,dag = dag ) validationFailedOperator = PythonOperator(task_id='task_validationFailed' ,python_callable = fn_failed_params_validation ,dag=dag ,do_xcom_push=True ) preCheckOperator = BranchPythonOperator(task_id = 'task_validateParams' ,python_callable = fn_preCheckOperator ,provide_context = True ,dag=dag ) triggerShellOperator = PythonOperator(task_id='task_triggerShellScript' ,python_callable = fn_triggerShell ,dag=dag ,do_xcom_push=True ) #*********************DAG Sequence*******************************************************************************************# beginProcess >> Label("Validate for configuration file, shell script, arguments") >> preCheckOperator preCheckOperator >> Label("Validation failed") >> validationFailedOperator preCheckOperator >> Label("Validation success") >> triggerShellOperator
Editor is loading...
Leave a Comment