Untitled
unknown
plain_text
2 years ago
1.9 kB
8
Indexable
from pyspark.sql.types import StructType, StructField , StringType, IntegerType from pyspark.sql.functions import lit data2 = [("James","","Smith", "12345", "M", 3000), ("Michael","Rose","","23456","F", 4000), ("Robert","","Williams","32456", "M", 4000), ("James","Joe","Callab", "11111", "F", 2000), ("Gill","","Anthony","22222","M", 1000) ] schema = StructType([ \ StructField("firstname", StringType(), True), \ StructField("middlename", StringType(), True), \ StructField("lastname", StringType(), True), \ StructField("id", StringType(), True), \ StructField("gender", StringType(), True), \ StructField("salary", IntegerType(), True) ]) df = spark.createDataFrame(data = data2, schema= schema) df_with_country=df.withColumn("Country", lit("USA")) df_with_country.printSchema() df_with_country.show(truncate = False) dfp = df_with_country # Write as Parquet table dfp.write.parquet("path/to/parquet/table", mode="overwrite") dfp.show() # Filter out row to delete dfp = dfp.filter(dfp.id != "12345") # Write DataFrame back to Parquet file dfp.write.parquet("path/to/parquet/table", mode="overwrite") dfp.show() #AFTER A ROW IS DELETED IN PARQUET CANNOT BE RECOVERED from pyspark.sql.functions import col dfd=df_with_country # Write as Delta table to local file system dfd.write.format("delta").mode("overwrite").save("file:///path/to/delta/table2") # Read Delta table from local file system dfd = spark.read.format("delta").load("file:///path/to/delta/table2") dfd.show() # Delete row based on a condition dfd = dfd.filter(col("id") != "12345") # Write the updated table back to Delta dfd.write.format("delta").mode("overwrite").save("file:///path/to/delta/table2") #Show w/o the row we delete dfd.show()
Editor is loading...