Untitled
unknown
plain_text
a year ago
1.7 kB
9
Indexable
def test_multiple_updates_for_single_row(self, spark_fixture: SparkSession) -> None:
seed_df = spark_fixture.createDataFrame(SEED_DATA, TABLE_SCHEMA)
update_df = spark_fixture.createDataFrame([
Row(**{
"artist_id": 1234,
"artist_name": "Rufus Du Sol",
"track_name": "Pressure v2",
"track_number": 1,
"_source_file_name": NEW_PARQUET_FILENAME,
"_dms_synced_at": ONE_HOUR_LATER,
"Op": "U",
}),
Row(**{
"artist_id": 1234,
"artist_name": "Rufus Du Sol",
"track_name": "Pressure v2",
"track_number": 4,
"_source_file_name": NEW_PARQUET_FILENAME,
"_dms_synced_at": ONE_HOUR_LATER,
"Op": "U",
})
], CDC_SCHEMA)
expected_df = (
seed_df
.withColumn(
"track_name",
F.when(F.col("artist_name") == "Rufus Du Sol", "Pressure v2")
.otherwise(F.col("track_name"))
)
.withColumn(
"track_number",
F.when(F.col("artist_name") == "Rufus Du Sol", 4)
.otherwise(F.col("track_number"))
)
)
with TemporaryDirectory() as tmp_dir:
target_path = os.path.join(tmp_dir, "target")
seed_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(target_path)
target_dt = DeltaTable.forPath(spark_fixture, target_path)
run_cdc_merge(update_df, target_dt, ["artist_id", "artist_name"], CDC_INSERT_SCHEMA_COLS)
actual = spark_fixture.read.format("delta").load(target_path)
assertDataFrameEqual(expected_df, actual)Editor is loading...
Leave a Comment