Untitled

 avatar
unknown
plain_text
9 months ago
7.7 kB
4
Indexable
// code bieu mau ke hoach doanh thu salein nsp 


package vn.com.viettel.code.dashboard_date

import vn.com.viettel.AppConfig
import vn.com.viettel.utils.SparkUtils.{DataFrameOps, createBatchSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
object salein_class {

  def main(args: Array[String]): Unit = {
    val spark = createBatchSession()
    // salein class date 
    val df_product_class_detail = spark.read.table("WHS.SalePlan_ProductClassDetail")
    val product_class = spark.read.table("WHS.SalePlan_ProductClass")
    val product_class_detail = spark.read.table("WHS.SalePlan_ProductClassDetail")
    val product_class_detail1 = spark.read.table("WHS.SalePlan_ProductClassDetail")
    val product_class_detail2 = spark.read.table("WHS.SalePlan_ProductClassDetail")

    // read data from gold_zone
    val temp_product = product_class
      .join(product_class_detail, product_class("N_SP1") === product_class_detail("ID"), "left")
      .join(product_class_detail1.as("product_class_detail1"), product_class("N_SP2") === product_class_detail1("ID"), "left")
      .join(product_class_detail2.as("product_class_detail2"), product_class("N_SP3") === product_class_detail2("ID"), "left")
      .select(product_class("ItemName"),
        product_class("ItemCodeSAP"),
        product_class("N_SP1"),
        product_class("N_SP2"),
        product_class("N_SP3"),
        product_class("SPKM_BH1"),
        product_class("SPKM_BH2"),
        product_class_detail("Class").alias("Name_NSP1"),
        col("product_class_detail1.Class").alias("Name_NSP2"),
        col("product_class_detail2.Class").alias("Name_NSP3")
      )

    val kh = spark.read.table("excel_form.test_KH_BH1_KH")
    val kh_salein_class_product_class = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/salein_tx_thutien_nsp.csv").filter(col("Ten_chi_tieu") === "Doanh thu (SaleIn)" and col("Loai_KH") === "Thang")
      .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
      .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
      .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
      .withColumnRenamed("Don_vi", "SalesDepartment")
      .withColumnRenamed("Vung_CN", "OrganizationName")
      .withColumnRenamed("Kenh_ban", "SalesChannelName")
      .withColumnRenamed("Name_NSP1", "NSP1")
      .withColumnRenamed("Name_NSP2", "NSP2")
      .withColumnRenamed("Name_NSP3", "NSP3")
      .withColumnRenamed("Ten_doi", "Team_Name")
      .withColumn("Gia_tri_ke_hoach", col("Doanh_thu").cast("double"))
      .withColumn("So_luong", col("So_luong").cast("int"))
      .join(temp_product, col("NSP1") === temp_product("Name_NSP1")
        && col("NSP2") === temp_product("Name_NSP2")
        && col("NSP3") === temp_product("Name_NSP3"), "left")
      .select(
        col("*"),
        temp_product("N_SP1").alias("ID_NSP1"),
        temp_product("N_SP2").alias("ID_NSP2"),
        temp_product("N_SP3").alias("ID_NSP3")
      )
      .withColumn("Ma_Nhom", concat(lit("L"), substring(col("ID_NSP1"), -1, 1)))
      .withColumnRenamed("Doanh_thu","Revenue_Salein_Plan")
      .withColumn("ItemCode",lit(null))
      .withColumn("ItemName", lit(null))
      .withColumn("CustomerName", lit(null))
      .withColumn("CustomerCode", lit(null))
      .withColumnRenamed("Thoi_gian_ke_hoach", "Day")
      .withColumn("Revenue_Salein_Perform", lit(0))

    // generate data from 2010 and 2050


    val plan = kh_salein_class_product_class
      .withColumnRenamed("Name_NSP1", "NSP1")
      .withColumnRenamed("Name_NSP2", "NSP2")
      .withColumnRenamed("Name_NSP3", "NSP3")
      .select(
        col("Day"),
        col("SaleDepartment"),
        col("OrganizationName"),
        col("SalesChannelName"),
        col("NSP1"),
        col("NSP2"),
        col("NSP3"),
        col("Team_Name"),
        col("Revenue_Salein_Plan"),
        col("Revenue_Salein_Perform"),
        col("Type")
      )
      .alias("plan")
      .join(temp_product.alias("tp"),
        col("plan.NSP1") === col("tp.Name_NSP1") &&
          col("plan.NSP2") === col("tp.Name_NSP2") &&
          col("plan.NSP3") === col("tp.Name_NSP3"), "left")
      .select(
        col("plan.*"),
        col("tp.N_SP1").alias("N_SP1"),
        col("tp.N_SP2").alias("N_SP2"),
        col("tp.N_SP3").alias("N_SP3")
      )
      .withColumn("Ma_Nhom", concat(lit("L"), substring(col("N_SP1"), -1, 1)))

    val df_salein_class = spark.read.format("avro").load("/gold_zone/full_load/kpi_date/salein_dt")

    val df_output = df_salein_class
      .withColumnRenamed("Organization", "OrganizationName")
      .withColumnRenamed("Kenh", "SalesChannelName")
      .withColumnRenamed("Name_NSP1", "NSP1")
      .withColumnRenamed("Name_NSP2", "NSP2")
      .withColumnRenamed("Name_NSP3", "NSP3")
      .withColumnRenamed("Doanh_thu", "Revenue_Salein_Perform")
      .groupBy(
        col("Ngay_lapHD"),
        col("SaleDepartment"),
        col("OrganizationName"),
        col("SalesChannelName"),
        col("Ma_Nhom"),
        col("N_SP1"),
        col("N_SP2"),
        col("N_SP3"),
        col("NSP1"),
        col("NSP2"),
        col("NSP3"),
        col("Team_Name")
      )
      .agg(
        sum("Revenue_Salein_Perform").alias("Revenue_Salein_Perform")
      )
      .select(
        col("Ngay_lapHD").alias("Day"),
        col("SaleDepartment"),
        col("OrganizationName"),
        col("SalesChannelName"),
        col("Ma_Nhom"),
        col("N_SP1"),
        col("N_SP2"),
        col("N_SP3"),
        col("NSP1"),
        col("NSP2"),
        col("NSP3"),
        col("Team_Name"),
        col("Revenue_Salein_Perform")
      )
      .withColumn("Type", lit(null))
      .withColumn("Revenue_Salein_Plan", lit(0))
      .alias("df_output")

    val result = df_output.unionByName(plan)
    val filteredResult = result
      .filter(col("Day").between("2024-01-01", "2024-12-31"))
      .select(
        lit("Doanh thu (SaleIn)").alias("Ten_chi_tieu"),
        lit(null).alias("Thoi_gian_ke_hoach"),
        col("SaleDepartment").alias("Don_vi"),
        col("OrganizationName").alias("Vung_CN"),
        col("SalesChannelName").alias("Kenh_ban"),
        col("Team_Name").alias("Ten_doi"),
        col("NSP1").alias("Name_NSP1"),
        col("NSP2").alias("Name_NSP2"),
        col("NSP3").alias("Name_NSP3"),
        lit(null).alias("So_luong"),
        lit(null).alias("Doanh_thu"),
        lit(null).alias("Loai_KH")

      )

    // Apply distinct to ensure unique rows
    val re = filteredResult.distinct()
    val reCast = re
      .withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType))
      .withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType))
      .withColumn("Don_vi", col("Don_vi").cast(StringType))
      .withColumn("Vung_CN", col("Vung_CN").cast(StringType))
      .withColumn("Kenh_ban", col("Kenh_ban").cast(StringType))
      .withColumn("Ten_doi", col("Ten_doi").cast(StringType))
      .withColumn("Name_NSP1", col("Name_NSP1").cast(StringType))
      .withColumn("Name_NSP2", col("Name_NSP2").cast(StringType))
      .withColumn("Name_NSP3", col("Name_NSP3").cast(StringType))
      .withColumn("So_luong", col("So_luong").cast(IntegerType))
      .withColumn("Doanh_thu", col("Doanh_thu").cast(DoubleType))
      .withColumn("Loai_KH", col("Loai_KH").cast(StringType))

    result.write.format("avro").mode("overwrite").save(AppConfig.outputDir)
  }
}
Editor is loading...
Leave a Comment