Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
2.1 kB
4
Indexable
Never
import pyspark
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import date_format


spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

print("######################################")
print("READING CSV FILE DATASETS")
print("######################################")

df_product = spark.read \
    .option("Header", True) \
    .csv("/root/spark/product.csv")

df_transaction = spark.read \
    .option("header", True) \
    .csv("/root/spark/transaction.csv")

print("######################################")
print("GABUNG SEMUA DATA")
print("######################################")

df_merge = df_transaction \
    .join(df_product, ["product_id"], "inner")\
    .repartition(400)

print("######################################")
print("UBAH TIPE DATA")
print("######################################")

df_merge.withColumn("product_id",df_merge["product_id"].cast(IntegerType())) \
    .withColumn("transaction_date",to_date("transaction_date")) \
    .withColumn("total",df_merge["total"].cast(IntegerType()))

print("######################################")
print("BUAT KOLOM BARU YEAR DAN MONTH")
print("######################################")

df_merge = df_merge\
    .withColumn("year",date_format("transaction_date",'yyyy'))\
    .withColumn("month",date_format("transaction_date","MM"))

print("######################################")
print("PARTISI DATA BERDASARKAN BULAN DAN TAHUN, DAN SIMPAN KE PARQUET")
print("######################################")

df_merge \
    .write.partitionBy("year","month") \
          .mode("overwrite") \
          .parquet("partisi.parquet")

print("######################################")
print("BACA PARQUET DAN LAKUKAN AGGREGATE")
print("######################################")

df_partisi = spark.read.parquet("partisi.parquet")
df_result = df_partisi \
    .groupBy("year","month") \
    .agg(avg("total").alias("revenue"))