Untitled
unknown
plain_text
2 years ago
8.5 kB
10
Indexable
but while triggering the dag in UI iam not getting option for passing params..for example in the below code iam getting that option
from airflow import DAG
from airflow import AirflowException
from airflow.models import Variable
from airflow.models.xcom import XCom
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.edgemodifier import Label
from datetime import datetime
from sys import exc_info as sys_exc_info
from sys import stderr as sys_stderr
#*********************User Defined Functions ********************************************************************************************#
def fn_failed_params_validation(ti, **kwargs):
from airflow.utils.state import TaskInstanceState
dag_run = kwargs['dag_run']
xom_key = dag_run.dag_id + '_task_validateParams'
xom_error = ti.xcom_pull(task_ids=['task_validateParams'], key=xom_key)
dag_run.dag.set_task_instance_state(task_id="task_validateParams"
,state=TaskInstanceState.FAILED
,run_id=dag_run.run_id
)
print("XOM_ERROR")
print(xom_error)
def fn_triggerShell(ti, **kwargs):
from subprocess import Popen, PIPE
dag_run = kwargs['dag_run']
xom_key = dag_run.dag_id + '_task_validateParams'
bashCommand = ti.xcom_pull(task_ids=['task_validateParams'], key=xom_key)
bashCommand = bashCommand[0]
print("bashCommand : %s" % bashCommand)
shell = Popen(bashCommand, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
output, err = shell.communicate()
pid = str(shell.pid)
print("processId : %s" % pid)
print("returncode: %s" % shell.returncode)
print("output : %s" % output)
print("err : %s" % err)
if shell.returncode == 0:
print("Space check script completed")
else:
raise AirflowException("Space check script failed")
# endIf==>_shell_returncode___
def fn_preCheckOperator(**kwargs):
from configparser import ConfigParser as conf_ConfigParser
from os import environ as os_environ
from os import path as os_path
# Dag Name
print("dag_id from dag:")
dag = kwargs['dag']
dag_name = dag.dag_id
print(dag_name)
print("dag_id from task instance:")
ti = kwargs['task_instance']
dag_name = ti.dag_id
print(dag_name)
print("dag_id from dag_run:")
dag_run = kwargs['dag_run']
dag_name = dag_run.dag_id
print(dag_name)
xom_key = dag_name + '_task_validateParams'
xcom_value = "Success"
validation_flag = False
adminVariableValue = {"diskSpaceThreshold": "90", "fileSizeInMB": "1024"}
diskSpaceThreshold = None
fileSizeInMB = None
bashCommand = None
try:
file_path = os_path.realpath(__file__).replace(os_path.basename(__file__),"")
config_file = file_path.replace(".py",".config").replace("/dags/","/config/")
shell_script_name = os_path.basename(__file__).replace(".py",".sh")
print("file_path : %s" % file_path)
print("shell_script_path : %s" % shell_script_name)
print("config_file : %s" % config_file)
# Shell script details
if not os_path.exists(file_path+shell_script_name):
raise Exception("ERROR: SHELL SCRIPT MISSING.")
# Config file
if not os_path.exists(config_file):
raise AirflowException("ERROR: CONFIGURATION FILE MISSING.")
configProperties = conf_ConfigParser()
configProperties.read(config_file)
if kwargs['dag_run'].conf :
adminVariableValue = kwargs['dag_run'].conf
print ('Airflow conf variable value assigned: %s, %s' % (adminVariableValue, type(adminVariableValue)))
elif Variable.get(dag_name, None):
adminVariableValue = Variable.get(dagIdVar, None)
adminVariableValue = json.loads(adminVariableValue)
print ('Airflow UI variable value assigned: %s %s' % (adminVariableValue, type(adminVariableValue)))
elif configProperties.has_option("shellConfiguration", "adminVariableValue"):
adminVariableValue = configProperties.get('shellConfiguration', 'adminVariableValue', raw = True, fallback = None)
adminVariableValue = json.loads(adminVariableValue)
print ('Airflow configuration value assigned: %s %s' % (adminVariableValue, type(adminVariableValue)))
else:
print ('Airflow default values assigned: %s %s' % (adminVariableValue, type(adminVariableValue)))
if adminVariableValue and isinstance(adminVariableValue, dict):
if 'diskSpaceThreshold' in adminVariableValue.keys():
diskSpaceThreshold = adminVariableValue['diskSpaceThreshold']
if 'fileSizeInMB' in adminVariableValue.keys():
fileSizeInMB = adminVariableValue['fileSizeInMB']
else:
raise AirflowException("ERROR: PARAMTER VALUES ERROR. %s, %s" % (adminVariableValue, type(adminVariableValue)))
print("diskSpaceThreshold: %s" % diskSpaceThreshold)
print("fileSizeInMB : %s" % fileSizeInMB)
bashCommand = (shell_script_name + " " + diskSpaceThreshold + " " + str(fileSizeInMB) + " ")
bashCommand = "cd " + file_path + ";./" + bashCommand
print("bashCommand : %s" % bashCommand)
xcom_value = bashCommand
validation_flag = True
except Exception as errrsdb:
validation_flag = False
exception_type, exception_object, exception_traceback = sys_exc_info()
exception_file_name = exception_traceback.tb_frame.f_code.co_filename
exception_line_number = exception_traceback.tb_lineno
xcom_value = ("%s|ERROR|%s|Line No. %s|%s\n" % (datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], exception_file_name, exception_line_number, str(errrsdb)))
print(xcom_value)
ti.xcom_push(key=xom_key, value=xcom_value)
if validation_flag:
return "task_triggerShellScript"
else:
return "task_validationFailed"
#*********************DAG Defnition ********************************************************************************************#
default_args = {'owner': 'devops'
,'depends_on_past': False
,'start_date': days_ago(2)
}
dag = DAG("devOps_diskSpaceChecker"
,default_args=default_args
,description="devOps_diskSpaceChecker"
,schedule_interval="30 3,11,19 * * *"
,catchup=False
,tags=["devops"]
,params={"diskSpaceThreshold":"72", "fileSizeInMB":"500"}
)
#*********************DAG Tasks********************************************************************************************#
beginProcess = DummyOperator(task_id = 'task_beginProcess'
,dag = dag
)
validationFailedOperator = PythonOperator(task_id='task_validationFailed'
,python_callable = fn_failed_params_validation
,dag=dag
,do_xcom_push=True
)
preCheckOperator = BranchPythonOperator(task_id = 'task_validateParams'
,python_callable = fn_preCheckOperator
,provide_context = True
,dag=dag
)
triggerShellOperator = PythonOperator(task_id='task_triggerShellScript'
,python_callable = fn_triggerShell
,dag=dag
,do_xcom_push=True
)
#*********************DAG Sequence*******************************************************************************************#
beginProcess >> Label("Validate for configuration file, shell script, arguments") >> preCheckOperator
preCheckOperator >> Label("Validation failed") >> validationFailedOperator
preCheckOperator >> Label("Validation success") >> triggerShellOperator
Editor is loading...
Leave a Comment