Untitled
unknown
plain_text
a year ago
2.1 kB
4
Indexable
Never
import pyspark from pyspark.sql.functions import * from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType from pyspark.sql.functions import date_format spark = SparkSession.builder \ .master("local[*]") \ .appName('test') \ .getOrCreate() print("######################################") print("READING CSV FILE DATASETS") print("######################################") df_product = spark.read \ .option("Header", True) \ .csv("/root/spark/product.csv") df_transaction = spark.read \ .option("header", True) \ .csv("/root/spark/transaction.csv") print("######################################") print("GABUNG SEMUA DATA") print("######################################") df_merge = df_transaction \ .join(df_product, ["product_id"], "inner")\ .repartition(400) print("######################################") print("UBAH TIPE DATA") print("######################################") df_merge.withColumn("product_id",df_merge["product_id"].cast(IntegerType())) \ .withColumn("transaction_date",to_date("transaction_date")) \ .withColumn("total",df_merge["total"].cast(IntegerType())) print("######################################") print("BUAT KOLOM BARU YEAR DAN MONTH") print("######################################") df_merge = df_merge\ .withColumn("year",date_format("transaction_date",'yyyy'))\ .withColumn("month",date_format("transaction_date","MM")) print("######################################") print("PARTISI DATA BERDASARKAN BULAN DAN TAHUN, DAN SIMPAN KE PARQUET") print("######################################") df_merge \ .write.partitionBy("year","month") \ .mode("overwrite") \ .parquet("partisi.parquet") print("######################################") print("BACA PARQUET DAN LAKUKAN AGGREGATE") print("######################################") df_partisi = spark.read.parquet("partisi.parquet") df_result = df_partisi \ .groupBy("year","month") \ .agg(avg("total").alias("revenue"))