Untitled
unknown
plain_text
2 years ago
2.1 kB
16
Indexable
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
import os
import time
class DirectoryModifiedSensor(BaseSensorOperator):
template_fields = ('directory_path',)
@apply_defaults
def __init__(self, directory_path, buffer_time=660, *args, **kwargs):
"""
Initialize the sensor with a directory path and a buffer time.
:param directory_path: Path of the directory to monitor.
:param buffer_time: Time buffer in seconds before the current time to consider for file changes. Defaults to 660 seconds (10 minutes + 1 minute buffer).
"""
super().__init__(*args, **kwargs)
self.directory_path = directory_path
self.buffer_time = buffer_time
self.last_run_timestamp = time.time() - self.buffer_time
def poke(self, context):
changed_files = self._check_directory(self.directory_path)
if changed_files:
self.log.info('Changes detected, pushing paths to XCom.')
context['ti'].xcom_push(key='changed_files', value=changed_files)
return True
self.log.info('No changes detected.')
return False
def _check_directory(self, directory_path):
"""Checks the directory for any added or modified files since the last run."""
changed_files = []
for root, _, files in os.walk(directory_path):
for name in files:
file_path = os.path.join(root, name)
if self._file_modified_or_added(file_path):
changed_files.append(file_path)
return changed_files
def _file_modified_or_added(self, file_path):
"""Determines if a file has been modified or added since the last check."""
try:
mod_time = os.path.getmtime(file_path)
if mod_time > self.last_run_timestamp:
return True
except FileNotFoundError:
# Ignore files that were deleted after the os.walk but before this check
pass
return False
Editor is loading...
Leave a Comment