Untitled
unknown
plain_text
21 days ago
4.4 kB
8
Indexable
"""
System: A nightly ETL pipeline that ingests raw transaction records,
filters out invalid rows, enriches amounts with an FX conversion,
and aggregates by merchant category.
The pipeline is run by a scheduler that calls `pipeline.run()` once per
merchant segment. For some segments there are multiple passes (e.g., a
'dry run' to validate, then a real run to write results).
The pipeline has a result cache to avoid re-aggregating when inputs
haven't changed. The cache is keyed on a config hash.
Bug report (from the data engineering team): "On the second and subsequent
calls to `pipeline.run()` for the same pipeline instance, the aggregated
results are all zeros — even when we invalidate the cache by changing the
config. We thought it was a stale cache bug but disabling the cache entirely
doesn't fix it."
Your task: Find and fix the bug. Explain what the cache had to do with it
(if anything).
"""
from __future__ import annotations
import hashlib
import json
from typing import Iterator
RAW_RECORDS = [
{"id": 1, "merchant_category": "food", "amount_usd": 12.50, "valid": True},
{"id": 2, "merchant_category": "travel", "amount_usd": 340.00, "valid": True},
{"id": 3, "merchant_category": "food", "amount_usd": 8.75, "valid": False},
{"id": 4, "merchant_category": "food", "amount_usd": 22.00, "valid": True},
{"id": 5, "merchant_category": "travel", "amount_usd": 15.00, "valid": True},
{"id": 6, "merchant_category": "retail", "amount_usd": 55.00, "valid": True},
{"id": 7, "merchant_category": "retail", "amount_usd": 30.00, "valid": False},
{"id": 8, "merchant_category": "retail", "amount_usd": 44.00, "valid": True},
]
def fetch_records() -> Iterator[dict]:
for record in RAW_RECORDS:
yield record
def filter_valid(records: Iterator[dict]) -> Iterator[dict]:
for record in records:
if record["valid"]:
yield record
def enrich_with_fx(records: Iterator[dict], fx_rate: float) -> Iterator[dict]:
for record in records:
enriched = dict(record)
enriched["amount_local"] = round(record["amount_usd"] * fx_rate, 2)
yield enriched
def aggregate_by_category(records: Iterator[dict]) -> dict[str, float]:
totals: dict[str, float] = {}
for record in records:
cat = record["merchant_category"]
totals[cat] = totals.get(cat, 0.0) + record["amount_local"]
return totals
class ETLPipeline:
def __init__(self, fx_rate: float, config: dict):
self.fx_rate = fx_rate
self.config = config
self._source = fetch_records()
self._cache: dict[str, dict] = {}
def _config_hash(self) -> str:
return hashlib.md5(json.dumps(self.config, sort_keys=True).encode()).hexdigest()
def run(self, use_cache: bool = True) -> dict[str, float]:
cache_key = self._config_hash()
if use_cache and cache_key in self._cache:
print(f" [cache hit] key={cache_key[:8]}")
return self._cache[cache_key]
print(f" [cache miss] key={cache_key[:8]}, running pipeline...")
filtered = filter_valid(self._source)
enriched = enrich_with_fx(filtered, self.fx_rate)
result = aggregate_by_category(enriched)
self._cache[cache_key] = result
return result
def run_scheduler():
config = {"region": "US", "segment": "premium", "version": 1}
pipeline = ETLPipeline(fx_rate=1.25, config=config)
print("=== Run 1 (cold, no cache) ===")
result1 = pipeline.run(use_cache=False)
print(f" Result: {result1}")
print("\n=== Run 2 (cache disabled, config unchanged) ===")
result2 = pipeline.run(use_cache=False)
print(f" Result: {result2}")
if result2 == result1:
print(" PASS: results match")
else:
print(f" FAIL: expected {result1}, got {result2}")
print("\n=== Run 3 (cache enabled, same config — should hit cache) ===")
result3 = pipeline.run(use_cache=True)
print(f" Result: {result3}")
print("\n=== Run 4 (cache invalidated via config change) ===")
pipeline.config["version"] = 2
result4 = pipeline.run(use_cache=True)
print(f" Result: {result4}")
if result4 == result1:
print(" PASS: same data, same result")
else:
print(f" FAIL: expected {result1}, got {result4}")
if __name__ == "__main__":
run_scheduler()Editor is loading...
Leave a Comment