Untitled

 avatar
unknown
plain_text
a year ago
1.9 kB
6
Indexable
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python import PythonOperator
#    filepath='/opt/airflow/test/test.txt',
from airflow.operators.python_operator import PythonOperator

from filesensorclass import DirectoryModifiedSensor

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2022, 1, 1),  # Adjust start date accordingly
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('directory_monitoring_dag',
          default_args=default_args,
          description='A DAG to monitor directory changes',
          schedule_interval=timedelta(minutes=5),  # Run every 5 minutes, adjust as needed
          catchup=False)

start_task = DummyOperator(
    task_id='start',
    dag=dag,
)

check_directory_changes = DirectoryModifiedSensor(
    task_id='check_directory_changes',
    directory_path='/opt/airflow/test',  # Change to the directory you want to monitor
    buffer_time=600,  # Buffer time of 10 minutes
    timeout=10,  # Timeout after 5 minutes; adjust as needed
    dag=dag,
)
def parse_changed_filess(ti, *args, **kwargs):
    # Retrieve the list of changed files from XCom
    changed_files = ti.xcom_pull(task_ids='check_directory_changes', key='changed_files')
    if not changed_files:
        print("No changed files to process.")
        return

    for file_path in changed_files:
        print(f"Processing changed file: {file_path}")
        # Add your file parsing logic here

# Add to your DAG definition
parse_files_task = PythonOperator(
    task_id='parse_changed_files',
    python_callable=parse_changed_filess,
    dag=dag,
)

end_task = DummyOperator(
    task_id='end',
    dag=dag,
)

start_task >> check_directory_changes >> parse_files_task >> end_task 
Editor is loading...
Leave a Comment