Untitled
unknown
plain_text
8 months ago
2.1 kB
2
Indexable
Never
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults import os import time class DirectoryModifiedSensor(BaseSensorOperator): template_fields = ('directory_path',) @apply_defaults def __init__(self, directory_path, buffer_time=660, *args, **kwargs): """ Initialize the sensor with a directory path and a buffer time. :param directory_path: Path of the directory to monitor. :param buffer_time: Time buffer in seconds before the current time to consider for file changes. Defaults to 660 seconds (10 minutes + 1 minute buffer). """ super().__init__(*args, **kwargs) self.directory_path = directory_path self.buffer_time = buffer_time self.last_run_timestamp = time.time() - self.buffer_time def poke(self, context): changed_files = self._check_directory(self.directory_path) if changed_files: self.log.info('Changes detected, pushing paths to XCom.') context['ti'].xcom_push(key='changed_files', value=changed_files) return True self.log.info('No changes detected.') return False def _check_directory(self, directory_path): """Checks the directory for any added or modified files since the last run.""" changed_files = [] for root, _, files in os.walk(directory_path): for name in files: file_path = os.path.join(root, name) if self._file_modified_or_added(file_path): changed_files.append(file_path) return changed_files def _file_modified_or_added(self, file_path): """Determines if a file has been modified or added since the last check.""" try: mod_time = os.path.getmtime(file_path) if mod_time > self.last_run_timestamp: return True except FileNotFoundError: # Ignore files that were deleted after the os.walk but before this check pass return False
Leave a Comment