Untitled

 avatar
unknown
plain_text
2 years ago
1.9 kB
8
Indexable
from pyspark.sql.types import StructType, StructField , StringType, IntegerType
from pyspark.sql.functions import lit 
data2 = [("James","","Smith", "12345", "M", 3000),
         ("Michael","Rose","","23456","F", 4000),
         ("Robert","","Williams","32456", "M", 4000),
         ("James","Joe","Callab", "11111", "F", 2000),
         ("Gill","","Anthony","22222","M", 1000)
        ]

schema = StructType([ \
      StructField("firstname", StringType(), True), \
      StructField("middlename", StringType(), True), \
      StructField("lastname", StringType(), True), \
      StructField("id", StringType(), True), \
      StructField("gender", StringType(), True), \
      StructField("salary", IntegerType(), True)                  
    ])


df = spark.createDataFrame(data = data2, schema= schema)
df_with_country=df.withColumn("Country", lit("USA"))
df_with_country.printSchema()
df_with_country.show(truncate = False)
dfp = df_with_country
# Write as Parquet table
dfp.write.parquet("path/to/parquet/table", mode="overwrite")
dfp.show()
# Filter out row to delete
dfp = dfp.filter(dfp.id != "12345")

# Write DataFrame back to Parquet file
dfp.write.parquet("path/to/parquet/table", mode="overwrite")
dfp.show()

#AFTER A ROW IS DELETED IN PARQUET CANNOT BE RECOVERED

from pyspark.sql.functions import col
dfd=df_with_country
# Write as Delta table to local file system
dfd.write.format("delta").mode("overwrite").save("file:///path/to/delta/table2")

# Read Delta table from local file system
dfd = spark.read.format("delta").load("file:///path/to/delta/table2")

dfd.show()

# Delete row based on a condition
dfd = dfd.filter(col("id") != "12345")

# Write the updated table back to Delta
dfd.write.format("delta").mode("overwrite").save("file:///path/to/delta/table2")

#Show w/o the row we delete
dfd.show()
Editor is loading...