Untitled

 avatar
unknown
plain_text
a month ago
1.9 kB
8
Indexable
def get_last_processed_folder():
    """Fetch the last processed folder from ClickHouse."""
    client = Client(
        host=CLICKHOUSE_HOST,
        port=CLICKHOUSE_PORT,
        user=CLICKHOUSE_USER,
        password=CLICKHOUSE_PASSWORD
    )
    
    # Query the latest folder based on the created_at column
    result = client.execute("""
        SELECT last_processed_folder
        FROM processing_metadata
        ORDER BY created_at DESC
        LIMIT 1
    """)
    
    # Return the latest folder if available
    return result[0][0] if result else None


def update_last_processed_folder(folder_name):
    """Update the last processed folder in ClickHouse."""
    client = Client(
        host=CLICKHOUSE_HOST,
        port=CLICKHOUSE_PORT,
        user=CLICKHOUSE_USER,
        password=CLICKHOUSE_PASSWORD
    )
    
    # Insert only the folder name; id will be generated automatically
    client.execute(f"""
        INSERT INTO processing_metadata (last_processed_folder)
        VALUES ('{folder_name}')
    """)


@task
def fetch_batch_folders():
    """Fetch a batch of folder names from S3."""
    s3 = boto3.client(
        's3',
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        endpoint_url=S3_CUSTOM_URL  # Add the custom URL here
    )

    # Get last processed folder
    last_processed = get_last_processed_folder()

    # Fetch folders from S3 with static MaxKeys
    response = s3.list_objects_v2(
        Bucket=S3_BUCKET,
        Delimiter='/',
        StartAfter=last_processed if last_processed else '',
        MaxKeys=BATCH_SIZE  # Always fetch up to BATCH_SIZE
    )

    # Extract folder prefixes
    all_folders = [obj['Prefix'] for obj in response.get('CommonPrefixes', [])]

    print(f"Processing batch: {all_folders}")
    return all_folders
Leave a Comment