Untitled
def get_last_processed_folder(): """Fetch the last processed folder from ClickHouse.""" client = Client( host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT, user=CLICKHOUSE_USER, password=CLICKHOUSE_PASSWORD ) # Query the latest folder based on the created_at column result = client.execute(""" SELECT last_processed_folder FROM processing_metadata ORDER BY created_at DESC LIMIT 1 """) # Return the latest folder if available return result[0][0] if result else None def update_last_processed_folder(folder_name): """Update the last processed folder in ClickHouse.""" client = Client( host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT, user=CLICKHOUSE_USER, password=CLICKHOUSE_PASSWORD ) # Insert only the folder name; id will be generated automatically client.execute(f""" INSERT INTO processing_metadata (last_processed_folder) VALUES ('{folder_name}') """) @task def fetch_batch_folders(): """Fetch a batch of folder names from S3.""" s3 = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, endpoint_url=S3_CUSTOM_URL # Add the custom URL here ) # Get last processed folder last_processed = get_last_processed_folder() # Fetch folders from S3 with static MaxKeys response = s3.list_objects_v2( Bucket=S3_BUCKET, Delimiter='/', StartAfter=last_processed if last_processed else '', MaxKeys=BATCH_SIZE # Always fetch up to BATCH_SIZE ) # Extract folder prefixes all_folders = [obj['Prefix'] for obj in response.get('CommonPrefixes', [])] print(f"Processing batch: {all_folders}") return all_folders
Leave a Comment