Untitled

 avatar
unknown
plain_text
13 days ago
4.4 kB
7
Indexable
"""
System: A nightly ETL pipeline that ingests raw transaction records,
filters out invalid rows, enriches amounts with an FX conversion,
and aggregates by merchant category.

The pipeline is run by a scheduler that calls `pipeline.run()` once per
merchant segment. For some segments there are multiple passes (e.g., a
'dry run' to validate, then a real run to write results).

The pipeline has a result cache to avoid re-aggregating when inputs
haven't changed. The cache is keyed on a config hash.

Bug report (from the data engineering team): "On the second and subsequent
calls to `pipeline.run()` for the same pipeline instance, the aggregated
results are all zeros — even when we invalidate the cache by changing the
config. We thought it was a stale cache bug but disabling the cache entirely
doesn't fix it."

Your task: Find and fix the bug. Explain what the cache had to do with it
(if anything).
"""

from __future__ import annotations
import hashlib
import json
from typing import Iterator


RAW_RECORDS = [
    {"id": 1, "merchant_category": "food",   "amount_usd": 12.50,  "valid": True},
    {"id": 2, "merchant_category": "travel", "amount_usd": 340.00, "valid": True},
    {"id": 3, "merchant_category": "food",   "amount_usd": 8.75,   "valid": False},
    {"id": 4, "merchant_category": "food",   "amount_usd": 22.00,  "valid": True},
    {"id": 5, "merchant_category": "travel", "amount_usd": 15.00,  "valid": True},
    {"id": 6, "merchant_category": "retail", "amount_usd": 55.00,  "valid": True},
    {"id": 7, "merchant_category": "retail", "amount_usd": 30.00,  "valid": False},
    {"id": 8, "merchant_category": "retail", "amount_usd": 44.00,  "valid": True},
]


def fetch_records() -> Iterator[dict]:
    for record in RAW_RECORDS:
        yield record


def filter_valid(records: Iterator[dict]) -> Iterator[dict]:
    for record in records:
        if record["valid"]:
            yield record


def enrich_with_fx(records: Iterator[dict], fx_rate: float) -> Iterator[dict]:
    for record in records:
        enriched = dict(record)
        enriched["amount_local"] = round(record["amount_usd"] * fx_rate, 2)
        yield enriched


def aggregate_by_category(records: Iterator[dict]) -> dict[str, float]:
    totals: dict[str, float] = {}
    for record in records:
        cat = record["merchant_category"]
        totals[cat] = totals.get(cat, 0.0) + record["amount_local"]
    return totals


class ETLPipeline:

    def __init__(self, fx_rate: float, config: dict):
        self.fx_rate = fx_rate
        self.config = config
        self._source = fetch_records()  
        self._cache: dict[str, dict] = {}

    def _config_hash(self) -> str:
        return hashlib.md5(json.dumps(self.config, sort_keys=True).encode()).hexdigest()

    def run(self, use_cache: bool = True) -> dict[str, float]:
        cache_key = self._config_hash()

        if use_cache and cache_key in self._cache:
            print(f"  [cache hit]  key={cache_key[:8]}")
            return self._cache[cache_key]

        print(f"  [cache miss] key={cache_key[:8]}, running pipeline...")

        filtered = filter_valid(self._source)
        enriched = enrich_with_fx(filtered, self.fx_rate)
        result   = aggregate_by_category(enriched)

        self._cache[cache_key] = result
        return result


def run_scheduler():
    config = {"region": "US", "segment": "premium", "version": 1}
    pipeline = ETLPipeline(fx_rate=1.25, config=config)

    print("=== Run 1 (cold, no cache) ===")
    result1 = pipeline.run(use_cache=False)
    print(f"  Result: {result1}")

    print("\n=== Run 2 (cache disabled, config unchanged) ===")
    result2 = pipeline.run(use_cache=False)
    print(f"  Result: {result2}")
    if result2 == result1:
        print("  PASS: results match")
    else:
        print(f"  FAIL: expected {result1}, got {result2}")

    print("\n=== Run 3 (cache enabled, same config — should hit cache) ===")
    result3 = pipeline.run(use_cache=True)
    print(f"  Result: {result3}")

    print("\n=== Run 4 (cache invalidated via config change) ===")
    pipeline.config["version"] = 2
    result4 = pipeline.run(use_cache=True)
    print(f"  Result: {result4}")
    if result4 == result1:
        print("  PASS: same data, same result")
    else:
        print(f"  FAIL: expected {result1}, got {result4}")


if __name__ == "__main__":
    run_scheduler()
Editor is loading...
Leave a Comment