Untitled
unknown
plain_text
6 months ago
1.7 kB
5
Indexable
def test_multiple_updates_for_single_row(self, spark_fixture: SparkSession) -> None: seed_df = spark_fixture.createDataFrame(SEED_DATA, TABLE_SCHEMA) update_df = spark_fixture.createDataFrame([ Row(**{ "artist_id": 1234, "artist_name": "Rufus Du Sol", "track_name": "Pressure v2", "track_number": 1, "_source_file_name": NEW_PARQUET_FILENAME, "_dms_synced_at": ONE_HOUR_LATER, "Op": "U", }), Row(**{ "artist_id": 1234, "artist_name": "Rufus Du Sol", "track_name": "Pressure v2", "track_number": 4, "_source_file_name": NEW_PARQUET_FILENAME, "_dms_synced_at": ONE_HOUR_LATER, "Op": "U", }) ], CDC_SCHEMA) expected_df = ( seed_df .withColumn( "track_name", F.when(F.col("artist_name") == "Rufus Du Sol", "Pressure v2") .otherwise(F.col("track_name")) ) .withColumn( "track_number", F.when(F.col("artist_name") == "Rufus Du Sol", 4) .otherwise(F.col("track_number")) ) ) with TemporaryDirectory() as tmp_dir: target_path = os.path.join(tmp_dir, "target") seed_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(target_path) target_dt = DeltaTable.forPath(spark_fixture, target_path) run_cdc_merge(update_df, target_dt, ["artist_id", "artist_name"], CDC_INSERT_SCHEMA_COLS) actual = spark_fixture.read.format("delta").load(target_path) assertDataFrameEqual(expected_df, actual)
Editor is loading...
Leave a Comment