Untitled
unknown
plain_text
2 years ago
1.9 kB
10
Indexable
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python import PythonOperator
# filepath='/opt/airflow/test/test.txt',
from airflow.operators.python_operator import PythonOperator
from filesensorclass import DirectoryModifiedSensor
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 1, 1), # Adjust start date accordingly
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('directory_monitoring_dag',
default_args=default_args,
description='A DAG to monitor directory changes',
schedule_interval=timedelta(minutes=5), # Run every 5 minutes, adjust as needed
catchup=False)
start_task = DummyOperator(
task_id='start',
dag=dag,
)
check_directory_changes = DirectoryModifiedSensor(
task_id='check_directory_changes',
directory_path='/opt/airflow/test', # Change to the directory you want to monitor
buffer_time=600, # Buffer time of 10 minutes
timeout=10, # Timeout after 5 minutes; adjust as needed
dag=dag,
)
def parse_changed_filess(ti, *args, **kwargs):
# Retrieve the list of changed files from XCom
changed_files = ti.xcom_pull(task_ids='check_directory_changes', key='changed_files')
if not changed_files:
print("No changed files to process.")
return
for file_path in changed_files:
print(f"Processing changed file: {file_path}")
# Add your file parsing logic here
# Add to your DAG definition
parse_files_task = PythonOperator(
task_id='parse_changed_files',
python_callable=parse_changed_filess,
dag=dag,
)
end_task = DummyOperator(
task_id='end',
dag=dag,
)
start_task >> check_directory_changes >> parse_files_task >> end_task Editor is loading...
Leave a Comment