Untitled
unknown
plain_text
a year ago
1.9 kB
6
Indexable
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python import PythonOperator # filepath='/opt/airflow/test/test.txt', from airflow.operators.python_operator import PythonOperator from filesensorclass import DirectoryModifiedSensor default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2022, 1, 1), # Adjust start date accordingly 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG('directory_monitoring_dag', default_args=default_args, description='A DAG to monitor directory changes', schedule_interval=timedelta(minutes=5), # Run every 5 minutes, adjust as needed catchup=False) start_task = DummyOperator( task_id='start', dag=dag, ) check_directory_changes = DirectoryModifiedSensor( task_id='check_directory_changes', directory_path='/opt/airflow/test', # Change to the directory you want to monitor buffer_time=600, # Buffer time of 10 minutes timeout=10, # Timeout after 5 minutes; adjust as needed dag=dag, ) def parse_changed_filess(ti, *args, **kwargs): # Retrieve the list of changed files from XCom changed_files = ti.xcom_pull(task_ids='check_directory_changes', key='changed_files') if not changed_files: print("No changed files to process.") return for file_path in changed_files: print(f"Processing changed file: {file_path}") # Add your file parsing logic here # Add to your DAG definition parse_files_task = PythonOperator( task_id='parse_changed_files', python_callable=parse_changed_filess, dag=dag, ) end_task = DummyOperator( task_id='end', dag=dag, ) start_task >> check_directory_changes >> parse_files_task >> end_task
Editor is loading...
Leave a Comment