Untitled

mail@pastecode.io avatar
unknown
plain_text
8 months ago
2.1 kB
2
Indexable
Never
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
import os
import time

class DirectoryModifiedSensor(BaseSensorOperator):
    template_fields = ('directory_path',)

    @apply_defaults
    def __init__(self, directory_path, buffer_time=660, *args, **kwargs):
        """
        Initialize the sensor with a directory path and a buffer time.

        :param directory_path: Path of the directory to monitor.
        :param buffer_time: Time buffer in seconds before the current time to consider for file changes. Defaults to 660 seconds (10 minutes + 1 minute buffer).
        """
        super().__init__(*args, **kwargs)
        self.directory_path = directory_path
        self.buffer_time = buffer_time
        self.last_run_timestamp = time.time() - self.buffer_time

    def poke(self, context):
        changed_files = self._check_directory(self.directory_path)
        if changed_files:
            self.log.info('Changes detected, pushing paths to XCom.')
            context['ti'].xcom_push(key='changed_files', value=changed_files)
            return True
        self.log.info('No changes detected.')
        return False

    def _check_directory(self, directory_path):
        """Checks the directory for any added or modified files since the last run."""
        changed_files = []
        for root, _, files in os.walk(directory_path):
            for name in files:
                file_path = os.path.join(root, name)
                if self._file_modified_or_added(file_path):
                    changed_files.append(file_path)
        return changed_files

    def _file_modified_or_added(self, file_path):
        """Determines if a file has been modified or added since the last check."""
        try:
            mod_time = os.path.getmtime(file_path)
            if mod_time > self.last_run_timestamp:
                return True
        except FileNotFoundError:
            # Ignore files that were deleted after the os.walk but before this check
            pass
        return False
Leave a Comment