Untitled

 avatar
unknown
plain_text
6 months ago
1.7 kB
5
Indexable
def test_multiple_updates_for_single_row(self, spark_fixture: SparkSession) -> None:
    seed_df = spark_fixture.createDataFrame(SEED_DATA, TABLE_SCHEMA)

    update_df = spark_fixture.createDataFrame([
        Row(**{
            "artist_id": 1234,
            "artist_name": "Rufus Du Sol",
            "track_name": "Pressure v2",
            "track_number": 1,
            "_source_file_name": NEW_PARQUET_FILENAME,
            "_dms_synced_at": ONE_HOUR_LATER,
            "Op": "U",
        }),
        Row(**{
            "artist_id": 1234,
            "artist_name": "Rufus Du Sol",
            "track_name": "Pressure v2",
            "track_number": 4,
            "_source_file_name": NEW_PARQUET_FILENAME,
            "_dms_synced_at": ONE_HOUR_LATER,
            "Op": "U",
        })
    ], CDC_SCHEMA)

    expected_df = (
        seed_df
        .withColumn(
            "track_name",
            F.when(F.col("artist_name") == "Rufus Du Sol", "Pressure v2")
            .otherwise(F.col("track_name"))
        )
        .withColumn(
            "track_number",
            F.when(F.col("artist_name") == "Rufus Du Sol", 4)
            .otherwise(F.col("track_number"))
        )
    )

    with TemporaryDirectory() as tmp_dir:
        target_path = os.path.join(tmp_dir, "target")
        seed_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(target_path)
        target_dt = DeltaTable.forPath(spark_fixture, target_path)

        run_cdc_merge(update_df, target_dt, ["artist_id", "artist_name"], CDC_INSERT_SCHEMA_COLS)

        actual = spark_fixture.read.format("delta").load(target_path)
        assertDataFrameEqual(expected_df, actual)
Editor is loading...
Leave a Comment