import pyspark
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import date_format
spark = SparkSession.builder \
.master("local[*]") \
.appName('test') \
.getOrCreate()
print("######################################")
print("READING CSV FILE DATASETS")
print("######################################")
df_product = spark.read \
.option("Header", True) \
.csv("/root/spark/product.csv")
df_transaction = spark.read \
.option("header", True) \
.csv("/root/spark/transaction.csv")
print("######################################")
print("GABUNG SEMUA DATA")
print("######################################")
df_merge = df_transaction \
.join(df_product, ["product_id"], "inner")\
.repartition(400)
print("######################################")
print("UBAH TIPE DATA")
print("######################################")
df_merge.withColumn("product_id",df_merge["product_id"].cast(IntegerType())) \
.withColumn("transaction_date",to_date("transaction_date")) \
.withColumn("total",df_merge["total"].cast(IntegerType()))
print("######################################")
print("BUAT KOLOM BARU YEAR DAN MONTH")
print("######################################")
df_merge = df_merge\
.withColumn("year",date_format("transaction_date",'yyyy'))\
.withColumn("month",date_format("transaction_date","MM"))
print("######################################")
print("PARTISI DATA BERDASARKAN BULAN DAN TAHUN, DAN SIMPAN KE PARQUET")
print("######################################")
df_merge \
.write.partitionBy("year","month") \
.mode("overwrite") \
.parquet("partisi.parquet")
print("######################################")
print("BACA PARQUET DAN LAKUKAN AGGREGATE")
print("######################################")
df_partisi = spark.read.parquet("partisi.parquet")
df_result = df_partisi \
.groupBy("year","month") \
.agg(avg("total").alias("revenue"))