Untitled
unknown
plain_text
9 months ago
7.7 kB
4
Indexable
// code bieu mau ke hoach doanh thu salein nsp package vn.com.viettel.code.dashboard_date import vn.com.viettel.AppConfig import vn.com.viettel.utils.SparkUtils.{DataFrameOps, createBatchSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions._ object salein_class { def main(args: Array[String]): Unit = { val spark = createBatchSession() // salein class date val df_product_class_detail = spark.read.table("WHS.SalePlan_ProductClassDetail") val product_class = spark.read.table("WHS.SalePlan_ProductClass") val product_class_detail = spark.read.table("WHS.SalePlan_ProductClassDetail") val product_class_detail1 = spark.read.table("WHS.SalePlan_ProductClassDetail") val product_class_detail2 = spark.read.table("WHS.SalePlan_ProductClassDetail") // read data from gold_zone val temp_product = product_class .join(product_class_detail, product_class("N_SP1") === product_class_detail("ID"), "left") .join(product_class_detail1.as("product_class_detail1"), product_class("N_SP2") === product_class_detail1("ID"), "left") .join(product_class_detail2.as("product_class_detail2"), product_class("N_SP3") === product_class_detail2("ID"), "left") .select(product_class("ItemName"), product_class("ItemCodeSAP"), product_class("N_SP1"), product_class("N_SP2"), product_class("N_SP3"), product_class("SPKM_BH1"), product_class("SPKM_BH2"), product_class_detail("Class").alias("Name_NSP1"), col("product_class_detail1.Class").alias("Name_NSP2"), col("product_class_detail2.Class").alias("Name_NSP3") ) val kh = spark.read.table("excel_form.test_KH_BH1_KH") val kh_salein_class_product_class = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/salein_tx_thutien_nsp.csv").filter(col("Ten_chi_tieu") === "Doanh thu (SaleIn)" and col("Loai_KH") === "Thang") .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M")) .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q")) .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y")) .withColumnRenamed("Don_vi", "SalesDepartment") .withColumnRenamed("Vung_CN", "OrganizationName") .withColumnRenamed("Kenh_ban", "SalesChannelName") .withColumnRenamed("Name_NSP1", "NSP1") .withColumnRenamed("Name_NSP2", "NSP2") .withColumnRenamed("Name_NSP3", "NSP3") .withColumnRenamed("Ten_doi", "Team_Name") .withColumn("Gia_tri_ke_hoach", col("Doanh_thu").cast("double")) .withColumn("So_luong", col("So_luong").cast("int")) .join(temp_product, col("NSP1") === temp_product("Name_NSP1") && col("NSP2") === temp_product("Name_NSP2") && col("NSP3") === temp_product("Name_NSP3"), "left") .select( col("*"), temp_product("N_SP1").alias("ID_NSP1"), temp_product("N_SP2").alias("ID_NSP2"), temp_product("N_SP3").alias("ID_NSP3") ) .withColumn("Ma_Nhom", concat(lit("L"), substring(col("ID_NSP1"), -1, 1))) .withColumnRenamed("Doanh_thu","Revenue_Salein_Plan") .withColumn("ItemCode",lit(null)) .withColumn("ItemName", lit(null)) .withColumn("CustomerName", lit(null)) .withColumn("CustomerCode", lit(null)) .withColumnRenamed("Thoi_gian_ke_hoach", "Day") .withColumn("Revenue_Salein_Perform", lit(0)) // generate data from 2010 and 2050 val plan = kh_salein_class_product_class .withColumnRenamed("Name_NSP1", "NSP1") .withColumnRenamed("Name_NSP2", "NSP2") .withColumnRenamed("Name_NSP3", "NSP3") .select( col("Day"), col("SaleDepartment"), col("OrganizationName"), col("SalesChannelName"), col("NSP1"), col("NSP2"), col("NSP3"), col("Team_Name"), col("Revenue_Salein_Plan"), col("Revenue_Salein_Perform"), col("Type") ) .alias("plan") .join(temp_product.alias("tp"), col("plan.NSP1") === col("tp.Name_NSP1") && col("plan.NSP2") === col("tp.Name_NSP2") && col("plan.NSP3") === col("tp.Name_NSP3"), "left") .select( col("plan.*"), col("tp.N_SP1").alias("N_SP1"), col("tp.N_SP2").alias("N_SP2"), col("tp.N_SP3").alias("N_SP3") ) .withColumn("Ma_Nhom", concat(lit("L"), substring(col("N_SP1"), -1, 1))) val df_salein_class = spark.read.format("avro").load("/gold_zone/full_load/kpi_date/salein_dt") val df_output = df_salein_class .withColumnRenamed("Organization", "OrganizationName") .withColumnRenamed("Kenh", "SalesChannelName") .withColumnRenamed("Name_NSP1", "NSP1") .withColumnRenamed("Name_NSP2", "NSP2") .withColumnRenamed("Name_NSP3", "NSP3") .withColumnRenamed("Doanh_thu", "Revenue_Salein_Perform") .groupBy( col("Ngay_lapHD"), col("SaleDepartment"), col("OrganizationName"), col("SalesChannelName"), col("Ma_Nhom"), col("N_SP1"), col("N_SP2"), col("N_SP3"), col("NSP1"), col("NSP2"), col("NSP3"), col("Team_Name") ) .agg( sum("Revenue_Salein_Perform").alias("Revenue_Salein_Perform") ) .select( col("Ngay_lapHD").alias("Day"), col("SaleDepartment"), col("OrganizationName"), col("SalesChannelName"), col("Ma_Nhom"), col("N_SP1"), col("N_SP2"), col("N_SP3"), col("NSP1"), col("NSP2"), col("NSP3"), col("Team_Name"), col("Revenue_Salein_Perform") ) .withColumn("Type", lit(null)) .withColumn("Revenue_Salein_Plan", lit(0)) .alias("df_output") val result = df_output.unionByName(plan) val filteredResult = result .filter(col("Day").between("2024-01-01", "2024-12-31")) .select( lit("Doanh thu (SaleIn)").alias("Ten_chi_tieu"), lit(null).alias("Thoi_gian_ke_hoach"), col("SaleDepartment").alias("Don_vi"), col("OrganizationName").alias("Vung_CN"), col("SalesChannelName").alias("Kenh_ban"), col("Team_Name").alias("Ten_doi"), col("NSP1").alias("Name_NSP1"), col("NSP2").alias("Name_NSP2"), col("NSP3").alias("Name_NSP3"), lit(null).alias("So_luong"), lit(null).alias("Doanh_thu"), lit(null).alias("Loai_KH") ) // Apply distinct to ensure unique rows val re = filteredResult.distinct() val reCast = re .withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType)) .withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType)) .withColumn("Don_vi", col("Don_vi").cast(StringType)) .withColumn("Vung_CN", col("Vung_CN").cast(StringType)) .withColumn("Kenh_ban", col("Kenh_ban").cast(StringType)) .withColumn("Ten_doi", col("Ten_doi").cast(StringType)) .withColumn("Name_NSP1", col("Name_NSP1").cast(StringType)) .withColumn("Name_NSP2", col("Name_NSP2").cast(StringType)) .withColumn("Name_NSP3", col("Name_NSP3").cast(StringType)) .withColumn("So_luong", col("So_luong").cast(IntegerType)) .withColumn("Doanh_thu", col("Doanh_thu").cast(DoubleType)) .withColumn("Loai_KH", col("Loai_KH").cast(StringType)) result.write.format("avro").mode("overwrite").save(AppConfig.outputDir) } }
Editor is loading...
Leave a Comment