Untitled
unknown
plain_text
a year ago
1.9 kB
12
Indexable
def get_last_processed_folder():
"""Fetch the last processed folder from ClickHouse."""
client = Client(
host=CLICKHOUSE_HOST,
port=CLICKHOUSE_PORT,
user=CLICKHOUSE_USER,
password=CLICKHOUSE_PASSWORD
)
# Query the latest folder based on the created_at column
result = client.execute("""
SELECT last_processed_folder
FROM processing_metadata
ORDER BY created_at DESC
LIMIT 1
""")
# Return the latest folder if available
return result[0][0] if result else None
def update_last_processed_folder(folder_name):
"""Update the last processed folder in ClickHouse."""
client = Client(
host=CLICKHOUSE_HOST,
port=CLICKHOUSE_PORT,
user=CLICKHOUSE_USER,
password=CLICKHOUSE_PASSWORD
)
# Insert only the folder name; id will be generated automatically
client.execute(f"""
INSERT INTO processing_metadata (last_processed_folder)
VALUES ('{folder_name}')
""")
@task
def fetch_batch_folders():
"""Fetch a batch of folder names from S3."""
s3 = boto3.client(
's3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
endpoint_url=S3_CUSTOM_URL # Add the custom URL here
)
# Get last processed folder
last_processed = get_last_processed_folder()
# Fetch folders from S3 with static MaxKeys
response = s3.list_objects_v2(
Bucket=S3_BUCKET,
Delimiter='/',
StartAfter=last_processed if last_processed else '',
MaxKeys=BATCH_SIZE # Always fetch up to BATCH_SIZE
)
# Extract folder prefixes
all_folders = [obj['Prefix'] for obj in response.get('CommonPrefixes', [])]
print(f"Processing batch: {all_folders}")
return all_foldersEditor is loading...
Leave a Comment