Untitled
unknown
plain_text
3 years ago
1.9 kB
13
Indexable
from pyspark.sql.types import StructType, StructField , StringType, IntegerType
from pyspark.sql.functions import lit
data2 = [("James","","Smith", "12345", "M", 3000),
("Michael","Rose","","23456","F", 4000),
("Robert","","Williams","32456", "M", 4000),
("James","Joe","Callab", "11111", "F", 2000),
("Gill","","Anthony","22222","M", 1000)
]
schema = StructType([ \
StructField("firstname", StringType(), True), \
StructField("middlename", StringType(), True), \
StructField("lastname", StringType(), True), \
StructField("id", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("salary", IntegerType(), True)
])
df = spark.createDataFrame(data = data2, schema= schema)
df_with_country=df.withColumn("Country", lit("USA"))
df_with_country.printSchema()
df_with_country.show(truncate = False)
dfp = df_with_country
# Write as Parquet table
dfp.write.parquet("path/to/parquet/table", mode="overwrite")
dfp.show()
# Filter out row to delete
dfp = dfp.filter(dfp.id != "12345")
# Write DataFrame back to Parquet file
dfp.write.parquet("path/to/parquet/table", mode="overwrite")
dfp.show()
#AFTER A ROW IS DELETED IN PARQUET CANNOT BE RECOVERED
from pyspark.sql.functions import col
dfd=df_with_country
# Write as Delta table to local file system
dfd.write.format("delta").mode("overwrite").save("file:///path/to/delta/table2")
# Read Delta table from local file system
dfd = spark.read.format("delta").load("file:///path/to/delta/table2")
dfd.show()
# Delete row based on a condition
dfd = dfd.filter(col("id") != "12345")
# Write the updated table back to Delta
dfd.write.format("delta").mode("overwrite").save("file:///path/to/delta/table2")
#Show w/o the row we delete
dfd.show()Editor is loading...