Untitled

 avatar
unknown
plain_text
a year ago
10 kB
5
Indexable
package vn.com.viettel.code.dashboard_date
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import vn.com.viettel.AppConfig
import vn.com.viettel.utils.SparkUtils.{DataFrameOps, createBatchSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions._

object dt_theo_nhom_sp {

  def main(args: Array[String]): Unit = {
    val spark = createBatchSession()




    // Tổng hợp thực hiện theo ngày

    val df_to_doi_c3 = spark.read.format("csv")
      .option("header", "true")
      .option("multiLine", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/Mapping_C3_to_doi.csv")
      .withColumnRenamed("Đội xung kích", "Team_Name")
      .withColumnRenamed("Mã KH", "Ma_KH")
      .withColumn("Chi_nhanh", col("Vùng"))


    val donhang_rd = spark.read.format("csv")
      .option("header", "true")
      .option("multiLine", "true").load("/raw_zone/excel_form/upload/rangdong_store/c3_*.csv")
      .filter(col("Username").rlike("^(RDMBC3|RDMNC3)") and col("Thành tiền") =!= "nan")
      .withColumn("Đơn Giá", col("Đơn Giá").cast("int"))
      .withColumn("Số lượng", col("Số lượng").cast("int"))
      .withColumn("Giảm giá", col("Giảm giá").cast("int"))
      .withColumn("Ngày đặt hàng", split(col("Ngày đặt hàng"), " ")(0))
      .withColumn(
        "Ngày đặt hàng",
        when(col("Ngày đặt hàng").contains("/"), to_date(col("Ngày đặt hàng"), "M/d/y"))
          .when(col("Ngày đặt hàng").contains("-"), to_date(col("Ngày đặt hàng"), "yyyy-MM-dd"))
      )
      .select(
        col("Username"),
        date_format(col("Ngày đặt hàng"), "yyyy-MM-dd").as("Date"),

        when(col("Username").startsWith("RDMBC3"), "TMĐT mở rộng BH1")
          .when(col("Username").startsWith("RDMNC3"), "TMĐT mở rộng BH2")
          .as("EcommerceOrderType"),
        col("Thành tiền").as("Doanh_thu"),
        col("Số lượng").as("So_luong"),
        col("Barcode")
      )

    val donhang = donhang_rd.as("t1").join(df_to_doi_c3.as("t2"), donhang_rd("Username") === df_to_doi_c3("Ma_KH"), "left")
    .select(
        "t1.*",
        "t2.Team_Name"
        )

    val df_product_class = spark.read.table("WHS.SalePlan_ProductClass")
    val df_product_class_detail = spark.read.table("WHS.SalePlan_ProductClassDetail").withColumnRenamed("Class", "Name_NSP1")

    val df_product_class_detail_clone_1 = df_product_class_detail.as("df_product_class_detail_clone_1").withColumnRenamed("Name_NSP1", "Name_NSP2")
    val df_product_class_detail_clone_2 = df_product_class_detail.as("df_product_class_detail_clone_2").withColumnRenamed("Name_NSP1", "Name_NSP3")

    val product_class = df_product_class.join(df_product_class_detail, df_product_class("N_SP1") === df_product_class_detail("ID"), "left")
      .join(df_product_class_detail_clone_1, df_product_class("N_SP2") === df_product_class_detail_clone_1("ID"))
      .join(df_product_class_detail_clone_2, df_product_class("N_SP3") === df_product_class_detail_clone_2("ID"))
      .select(
        df_product_class("ItemName"),
        df_product_class("ItemCodeSAP"),
        df_product_class("N_SP1"),
        df_product_class("N_SP2"),
        df_product_class("N_SP3"),
        df_product_class("SPKM_BH1"),
        df_product_class("SPKM_BH2"),
        df_product_class_detail("Name_NSP1"),
        df_product_class_detail_clone_1("Name_NSP2"),
        df_product_class_detail_clone_2("Name_NSP3")
      )

    val df_tt_ngay = donhang.join(product_class, donhang("Barcode") === product_class("ItemCodeSAP"), "left")
      .groupBy("Date", "EcommerceOrderType","N_SP1","N_SP2","N_SP3", "Name_NSP1", "Name_NSP2", "Name_NSP3", "Team_Name")
      .agg(sum("Doanh_thu").as("DT_Nhom_SP_TH"), sum("So_luong").as("SL_Nhom_SP_TH"))


    val df_tmdt_nhom_sp = spark.read.format("csv")
      .option("header", "true")
      .option("multiLine", "true").load("/raw_zone/excel_form/upload/ke_hoach/tmdt_dt_nhom_sp.csv")
      .filter(col("Ten_chi_tieu") === "dt_theo_nhom_sp" and col("Loai_KH") === "Thang")
      .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
      .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
      .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
      .groupBy("Month", "Quarter", "Year", "Don_vi",
        "Name_NSP1", "Name_NSP2", "Name_NSP3","Ten_doi").agg(sum("Doanh_thu").as("Doanh_thu"), sum("So_luong").as("So_luong"))

    val planWithProduct = df_tmdt_nhom_sp.join(product_class, df_tmdt_nhom_sp("Name_NSP1") === product_class("Name_NSP1")
        && df_tmdt_nhom_sp("Name_NSP2") === product_class("Name_NSP2")
        && df_tmdt_nhom_sp("Name_NSP3") === product_class("Name_NSP3"), "left")
      .select( df_tmdt_nhom_sp("Month"),
        df_tmdt_nhom_sp("Quarter"),
        df_tmdt_nhom_sp("Year"),
        df_tmdt_nhom_sp("Don_vi"),
        df_tmdt_nhom_sp("Ten_doi"),
        df_tmdt_nhom_sp("Doanh_thu"),
        df_tmdt_nhom_sp("So_luong"),
        product_class("N_SP1").as("N_SP1"),
        product_class("N_SP2").as("N_SP2"),
        product_class("N_SP3").as("N_SP3"),
        df_tmdt_nhom_sp("Name_NSP1"),
        df_tmdt_nhom_sp("Name_NSP2"),
        df_tmdt_nhom_sp("Name_NSP3")
      )
    val df_saleout_dt_kh = planWithProduct
      .groupBy("Month", "Quarter", "Year", "Don_vi", "N_SP1","N_SP2","N_SP3","Name_NSP1", "Name_NSP2","Name_NSP3" ,"Ten_doi")
      .agg(sum("Doanh_thu").as("DT_Nhom_SP_KH"),sum("So_luong").as("SL_Nhom_SP_KH"))
    val my_date = spark.read.format("avro").load("/raw_zone/full_load/date/")
    val date_of_month = spark.read.format("avro").load("/raw_zone/full_load/date/").groupBy("month", "quarter", "year").agg(count("*").alias("so_ngay"))
    val count_date_of_month = my_date.as("t1").join(date_of_month.as("t2"), my_date("month") === date_of_month("month") && my_date("year") === date_of_month("year"))
      .select(col("t1.date"), col("t1.month"), col("t1.quarter"), col("t1.year"), col("t2.so_ngay"))

    val df_tmdt__nhom_sp_kh_date = df_saleout_dt_kh.join(count_date_of_month, df_saleout_dt_kh("Month") === count_date_of_month("Month") and df_saleout_dt_kh("Year") === count_date_of_month("Year"), "left")
      .withColumn("DT_Nhom_SP_KH", col("DT_Nhom_SP_KH") / col("so_ngay"))
      .withColumn("SL_Nhom_SP_KH", round(col("SL_Nhom_SP_KH") / col("so_ngay")))

    val dt_theo_nhom_sp_thang = df_tt_ngay.join(df_tmdt__nhom_sp_kh_date, df_tt_ngay("Date") === df_tmdt__nhom_sp_kh_date("date") and
         df_tt_ngay("EcommerceOrderType") === df_tmdt__nhom_sp_kh_date("Don_vi") and
      df_tt_ngay("Team_Name") === df_tmdt__nhom_sp_kh_date("Ten_Doi") and
      df_tt_ngay("N_SP1") === df_tmdt__nhom_sp_kh_date("N_SP1") and
      df_tt_ngay("N_SP2") === df_tmdt__nhom_sp_kh_date("N_SP2") and
      df_tt_ngay("N_SP3") === df_tmdt__nhom_sp_kh_date("N_SP3") and
      df_tt_ngay("Name_NSP1") === df_tmdt__nhom_sp_kh_date("Name_NSP1") and
         df_tt_ngay("Name_NSP2") === df_tmdt__nhom_sp_kh_date("Name_NSP2") and
         df_tt_ngay("Name_NSP3") === df_tmdt__nhom_sp_kh_date("Name_NSP3"),
         "full"
         ).select(
             coalesce( df_tt_ngay("Date"), df_tmdt__nhom_sp_kh_date("date")).as("Date"),
             coalesce( df_tt_ngay("EcommerceOrderType"), df_tmdt__nhom_sp_kh_date("Don_vi")).as("EcommerceOrderType"),
      coalesce( df_tt_ngay("N_SP1"), df_tmdt__nhom_sp_kh_date("N_SP1")).as("ID_NSP1"),
      coalesce( df_tt_ngay("N_SP2"), df_tmdt__nhom_sp_kh_date("N_SP2")).as("ID_NSP2"),
      coalesce( df_tt_ngay("N_SP3"), df_tmdt__nhom_sp_kh_date("N_SP3")).as("ID_NSP3"),
      coalesce( df_tt_ngay("Team_Name"), df_tmdt__nhom_sp_kh_date("Ten_Doi")).as("Team_Name"),
      coalesce( df_tt_ngay("Name_NSP1"), df_tmdt__nhom_sp_kh_date("Name_NSP1")).as("Name_NSP1"),
             coalesce( df_tt_ngay("Name_NSP2"), df_tmdt__nhom_sp_kh_date("Name_NSP2")).as("Name_NSP2"),
             coalesce( df_tt_ngay("Name_NSP3"), df_tmdt__nhom_sp_kh_date("Name_NSP3")).as("Name_NSP3"),
             df_tt_ngay("DT_Nhom_SP_TH").as("DT_Nhom_SP_TH"),
             df_tt_ngay("SL_Nhom_SP_TH").as("SL_Nhom_SP_TH"),
             df_tmdt__nhom_sp_kh_date("DT_Nhom_SP_KH").as("DT_Nhom_SP_KH"),
             df_tmdt__nhom_sp_kh_date("SL_Nhom_SP_KH").as("SL_Nhom_SP_KH")
             ).withColumn("Ma_Nhom", concat(lit("L"), substring(col("ID_NSP1"), -1, 1)))
    val filteredResult = dt_theo_nhom_sp_thang
      .filter(col("Date").between("2024-01-01", "2024-12-31"))
      .select(
        lit("dt_theo_nhom_sp").alias("Ten_chi_tieu"),
        lit(null).alias("Thoi_gian_ke_hoach"),
        col("EcommerceOrderType").alias("Don_vi"),
        col("Team_Name").alias("Ten_doi"),
        col("Name_NSP1").alias("Name_NSP1"),
        col("Name_NSP2").alias("Name_NSP2"),
        col("Name_NSP3").alias("Name_NSP3"),
        lit(null).alias("DT_Nhom_SP_KH"),
        lit(null).alias("SL_Nhom_SP_KH"),
        lit(null).alias("Loai_KH")

      )

    // Apply distinct to ensure unique rows
    val re = filteredResult.distinct()
    val reCast = re
      .withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType))
      .withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType))
      .withColumn("Don_vi", col("Don_vi").cast(StringType))
      .withColumn("Ten_doi", col("Ten_doi").cast(StringType))
      .withColumn("Name_NSP1", col("Name_NSP1").cast(StringType))
      .withColumn("Name_NSP2", col("Name_NSP2").cast(StringType))
      .withColumn("Name_NSP3", col("Name_NSP3").cast(StringType))
      .withColumn("DT_Nhom_SP_KH", col("DT_Nhom_SP_KH").cast(IntegerType))
      .withColumn("SL_Nhom_SP_KH", col("SL_Nhom_SP_KH").cast(DoubleType))
      .withColumn("Loai_KH", col("Loai_KH").cast(StringType))

    dt_theo_nhom_sp_thang.write.format("avro").mode("overwrite").save(AppConfig.outputDir)
  }
}
Editor is loading...
Leave a Comment