Untitled

 avatar
unknown
plain_text
a year ago
39 kB
6
Indexable
/// code lay bieu mau saleout mt
package vn.com.viettel.code.dashboard_date

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import vn.com.viettel.AppConfig.outputDir
import vn.com.viettel.utils.SparkUtils.createBatchSession
object saleout_sp_mt {
  def main(argv: Array[String]): Unit = {
    val spark: SparkSession = createBatchSession()

    // Đọc kế hoạch 
    val df_kh_saleout_spmt = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/saleout_spmt.csv").filter(col("Ten_chi_tieu") === "SaleOut")
                            .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
                            .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
                            .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
    val df_kh_saleout_thang = df_kh_saleout_spmt.filter(col("Loai_KH") === "Thang").groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN", "Name_SPMT", "Ten_doi")
                            .agg(sum("So_luong").as("SaleOut_SL_SP_MT_KH"), sum("Doanh_thu").as("SaleOut_DT_SP_MT_KH"))
    // generate data from 2010 and 2050
    val my_date = spark.read.format("avro").load("/raw_zone/full_load/date/")
    val date_of_month = spark.read.format("avro").load("/raw_zone/full_load/date/").groupBy("month", "quarter", "year").agg(count("*").alias("so_ngay"))
    val count_date_of_month = my_date.as("t1").join(date_of_month.as("t2"), my_date("month") === date_of_month("month") && my_date("year") === date_of_month("year"))
      .select(col("t1.date"), col("t1.month"), col("t1.quarter"), col("t1.year"), col("t2.so_ngay"))
    // Gia_tri_ke_hoach and So_luong divide so_ngay
    val kh_saleout_spmt_date = df_kh_saleout_thang.join(count_date_of_month, df_kh_saleout_thang("Month") === count_date_of_month("Month") and df_kh_saleout_thang("Year") === count_date_of_month("Year"), "left")
      .withColumn("SaleOut_SL_SP_MT_KH", col("SaleOut_SL_SP_MT_KH") / col("so_ngay"))
      .withColumn("SaleOut_DT_SP_MT_KH", col("SaleOut_DT_SP_MT_KH") / col("so_ngay"))
    // Tổng hợp thực hiện
    val df_appuser = spark.read.table("whs.masterdata_appuser")
    val df_to_doi_c2 = spark.read.format("csv").option("multiLine", "true").option("header", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/to_doi_saleout.csv")
    val listBuyerStoreTypeID = Seq(17, 18, 23)
    val iso = spark.read.table("WHS.Order_IndirectSalesOrder")
      .withColumn("OrderDate", col("OrderDate") + expr("INTERVAL 7 HOURS"))
      .filter(col("BuyerStoreTypeId").isin(listBuyerStoreTypeID: _*))
      .filter(col("GeneralIndirectStateId") === 3)
    val isoc = spark.read.table("WHS.Order_IndirectSalesOrderContent")
    val store = spark.read.table("WHS.MasterData_DMSStore")
      .withColumn("StartDate", col("StartDate") + expr("INTERVAL 7 HOURS"))
      .withColumn("EndDate", col("EndDate") + expr("INTERVAL 7 HOURS"))
      .where(
        col("Code").like("TV01%") || col("Code").like("TV02%") || col("Code").like("TV03%") || col("Code").like("TN%") || col("Code").like("TG%") || col("Code").like("NT%") || col("Code").like("CT%") || col("Code").like("DN%") || col("Code").like("BH%") || col("Code").like("HCM%")
      )
    val mappingc2 =spark.read.format("csv").option("multiLine", "true").option("header", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/mapping_dms_c2td.csv")
    val item = spark.read.table("WHS.MasterData_Item").filter(col("DeletedAt").isNull)
    val ppgm = spark.read.table("WHS.MasterData_ProductProductGroupingMapping")
    val windowspec = Window.partitionBy("Name").orderBy(desc("UpdatedAt"))
    val productgrouping = spark.read.table("WHS.MasterData_ProductGrouping").filter(col("DeletedAt").isNull).withColumn("rn", row_number().over(windowspec)).filter(col("rn") === 1)
    
    // sản phẩm mục tiêu
    val mapping = item.as("i").join(
      ppgm.as("ppgm"),
      col("i.ProductId") === col("ppgm.ProductId"),
      "left"
    ).join(
      productgrouping.as("pg"),
      col("ppgm.ProductGroupingId") === col("pg.Id"),
      "left"
    ).select(
      col("i.Code").as("ItemCode"),
      col("i.Id").as("ItemId"),
      col("pg.Code"),
      col("pg.Name"),
      col("pg.UpdatedAt")
    ).distinct

    // Đơn hàng excel 

    val excel = spark.read.format("csv")
      .option("header", "true")
      .option("multiLine", "true")
      .load("/raw_zone/excel_form/upload/rangdong_store/c2_*.csv")
      .withColumn("Ngay_dat_hang", split(col("Ngày đặt hàng"), " ")(0))
      .withColumn(
        "Ngay_dat_hang",
        when(col("Ngay_dat_hang").contains("/"), to_date(col("Ngay_dat_hang"), "M/d/y"))
          .when(col("Ngay_dat_hang").contains("-"), to_date(col("Ngay_dat_hang"), "yyyy-MM-dd")))
      .withColumn("OrderDate", from_unixtime(unix_timestamp(col("Ngày đặt hàng"), "yyyy-MM-dd HH:mm:ss")).alias("OrderDate"))
    val don_hang = excel
      .join(mappingc2, mappingc2("User_Name") === excel("Username"), "left")
      .withColumn(("Id_excel"), substring(excel("Username"), -7, 7).cast("bigint"))
      .join(store, store("Id") === col("Id_excel"), "left")
      .join(item, excel("BarCode") === item("Code"), "left")
      .join(
        mapping,
        item("Id") === mapping("ItemId"),
        "left"
      )
      .filter(excel("OrderDate").between(store("StartDate") + expr("INTERVAL 7 HOURS"), store("EndDate") + expr("INTERVAL 7 HOURS")))
      .filter(col("StoreTypeId").isin(17, 18, 23))
      .withColumn("Organization",
        when(store("Code").startsWith("TV01"), lit("TV01"))
          .when(store("Code").startsWith("TV02"), lit("TV02"))
          .when(store("Code").startsWith("TV03"), lit("TV03"))
          .when(store("Code").startsWith("TN"), lit("TN"))
          .when(store("Code").startsWith("NT"), lit("NT"))
          .when(store("Code").startsWith("TG"), lit("TG"))
          .when(store("Code").startsWith("CT"), lit("CT"))
          .when(store("Code").startsWith("DN"), lit("DN"))
          .when(store("Code").startsWith("BH"), lit("BH"))
          .when(store("Code").startsWith("HCM"), lit("HCM")))
      .withColumn("SaleDepartment",
        when(store("Code").startsWith("TV01") or store("Code").startsWith("TV02") or store("Code").startsWith("TV03"), lit("P_BH1"))
          .when(store("Code").startsWith("TN") or store("Code").startsWith("TG") or store("Code").startsWith("NT") or store("Code").startsWith("CT") or store("Code").startsWith("DN") or store("Code").startsWith("BH") or store("Code").startsWith("HCM"), lit("P_BH2")))
      .select(
        to_date(excel("Ngay_dat_hang")).alias("Day"),
        col("SaleDepartment"),
        col("Organization"),
        mapping("Name").alias("ProductGroupName"),
        col("Thành tiền 2").alias("SaleOut_DT_SP_MT"),
        col("Số lượng").alias("SaleOut_SL_SP_MT"),
        mappingc2("Doi").alias("Team_Name")
      )

    // Đơn hàng DMS 
    
    val th = iso.as("t0")
      .join(df_appuser, df_appuser("Id") === col("t0.SaleEmployeeId"), "left")
      .join(df_to_doi_c2, df_appuser("Username") === df_to_doi_c2("Username"), "left")
      .join(
        store.as("t1"),
        col("t0.BuyerStoreId") === col("t1.Id") && col("t0.OrderDate").between(col("t1.StartDate"),col("t1.EndDate")),
        "left"
      ).join(
        isoc.as("t2"),
        col("t0.Id") === col("t2.IndirectSalesOrderId"),
        "left"
      ).join(
        mapping.as("m"),
        col("t2.ItemId") === col("m.ItemId"),
        "left"
      )
      .select(
        date_format(col("t0.OrderDate"), "yyyy-MM-dd").alias("Day"),
        when(col("t0.StoreCode").like("TV01%") || col("t0.StoreCode").like("TV02%") || col("t0.StoreCode").like("TV03%"), "P_BH1")
          .when(col("t0.StoreCode").like("TN%") || col("t0.StoreCode").like("TG%") || col("t0.StoreCode").like("NT%") || col("t0.StoreCode").like("CT%") || col("t0.StoreCode").like("DN%") || col("t0.StoreCode").like("BH%") || col("t0.StoreCode").like("HCM%"), "P_BH2")
          .as("SaleDepartment"),
        when(col("t1.Code").like("TV01%"), "TV01")
          .when(col("t1.Code").like("TV02%"), "TV02")
          .when(col("t1.Code").like("TV03%"), "TV03")
          .when(col("t1.Code").like("TN%"), "TN")
          .when(col("t1.Code").like("TG%"), "TG")
          .when(col("t1.Code").like("NT%"), "NT")
          .when(col("t1.Code").like("CT%"), "CT")
          .when(col("t1.Code").like("DN%"), "DN")
          .when(col("t1.Code").like("BH%"), "BH")
          .when(col("t1.Code").like("HCM%"), "HCM")
          .as("Organization"),
        col("t2.Amount").as("SaleOut_DT_SP_MT"),
        col("t2.Quantity").as("SaleOut_SL_SP_MT"),
        col("m.Name").as("ProductGroupName"),
        df_to_doi_c2("Team_Name")
      ).unionByName(don_hang)
      .groupBy(
        col("Day"),
        col("SaleDepartment"),
        col("Organization"),
        col("ProductGroupName"),
        col("Team_Name")
      ).agg(
        sum("SaleOut_DT_SP_MT").as("SaleOut_DT_SP_MT"),
        sum("SaleOut_SL_SP_MT").as("SaleOut_SL_SP_MT")
      )

    val th_thang = th.select(
        "Day",
        "SaleDepartment",
        "Organization",
        "ProductGroupName",
        "SaleOut_DT_SP_MT",
        "SaleOut_SL_SP_MT",
        "Team_Name"
      )
    
    val thang = th_thang.join(kh_saleout_spmt_date, th_thang("Day") === kh_saleout_spmt_date("date") and 
    th_thang("SaleDepartment") === kh_saleout_spmt_date("Don_vi") and 
    th_thang("Organization") === kh_saleout_spmt_date("Vung_CN") and 
    th_thang("ProductGroupName") === kh_saleout_spmt_date("Name_SPMT") and 
    th_thang("Team_Name") === kh_saleout_spmt_date("Ten_doi"),
    "full"
    ).select(
        coalesce(th_thang("Day"), kh_saleout_spmt_date("date")).as("Day"),
        coalesce(th_thang("SaleDepartment"), kh_saleout_spmt_date("Don_vi")).as("SaleDepartment"),
        coalesce(th_thang("Organization"), kh_saleout_spmt_date("Vung_CN")).as("Organization"),
        coalesce(th_thang("ProductGroupName"), kh_saleout_spmt_date("Name_SPMT")).as("ProductGroupName"),
        coalesce(th_thang("Team_Name"), kh_saleout_spmt_date("Ten_doi")).as("Team_Name"),
        th_thang("SaleOut_DT_SP_MT"),
        th_thang("SaleOut_SL_SP_MT"),
        kh_saleout_spmt_date("SaleOut_DT_SP_MT_KH"),
        kh_saleout_spmt_date("SaleOut_SL_SP_MT_KH")
        )
    val res = thang
      .filter(col("Day").between("2024-01-01", "2024-12-31"))
      .select(
        lit("SaleOut").alias("Ten_chi_tieu"),
        lit(null).alias("Thoi_gian_ke_hoach"),
        col("SaleDepartment").alias("Don_vi"),
        col("Organization").alias("Vung_CN"),
        col("ProductGroupName").alias("Name_SPMT"),
        col("Team_Name").alias("Ten_doi"),
        lit(null).alias("So_luong"),
        lit(null).alias("Doanh_thu")
      )

    val re = res.distinct()
    val reCast = re
      .withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType))
      .withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType))
      .withColumn("Don_vi", col("Don_vi").cast(StringType))
      .withColumn("Vung_CN", col("Vung_CN").cast(StringType))
      .withColumn("Name_SPMT", col("Name_SPMT").cast(StringType))
      .withColumn("Ten_doi", col("Ten_doi").cast(StringType))
      .withColumn("So_luong", col("So_luong").cast(IntegerType))
      .withColumn("Doanh_thu", col("Doanh_thu").cast(DoubleType))
    thang.write.format("avro").mode("overwrite").save(outputDir)
  }
}


/// code lay bieu mau saleout km 
package vn.com.viettel.code.dashboard_date

import org.apache.spark.sql.functions._
import vn.com.viettel.AppConfig
import vn.com.viettel.utils.SparkUtils.createBatchSession

object -+saleout_sp_km {
  def main(args: Array[String]): Unit = {
    val spark = createBatchSession()
    val indirectsalesorder = spark.read.table("WHS.Order_IndirectSalesOrder")
      .withColumn("OrderDate",col("OrderDate") + expr("INTERVAL 7 HOURS"))
    val indirectsaleordercontent = spark.read.table("WHS.Order_IndirectSalesOrderContent")
    val store = spark.read.table("WHS.MasterData_DMSStore")
    val saleplan_product_class = spark.read.table("WHS.SalePlan_ProductClass")
    val saleplan_product_class_detail = spark.read.table("WHS.SalePlan_ProductClassDetail")
    val item = spark.read.table("WHS.MasterData_Item")
    val khsp_bh1_bh2 = spark.read.format("avro").load("/gold_zone/full_load/whs/saleplan/khsp_bh1_bh2")
    //    val date = spark.read.format("avro").load(f"${AppConfig.rootPath}/date")
    val organization = spark.read.table("WHS.SaleChannel_BM02_Organization_RD")
    val mappingc2 =spark.read.format("csv").option("multiLine", "true").option("header", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/mapping_dms_c2td.csv")
    val df_appuser = spark.read.table("whs.masterdata_appuser")
    val df_to_doi_c2 = spark.read.format("csv").option("multiLine", "true").option("header", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/to_doi_saleout.csv")

    indirectsalesorder.createOrReplaceTempView("indirectsalesorder")
    store.createOrReplaceTempView("store")
    saleplan_product_class.createOrReplaceTempView("saleplan_product_class")
    saleplan_product_class_detail.createOrReplaceTempView("saleplan_product_class_detail")
    indirectsaleordercontent.createOrReplaceTempView("indirectsaleordercontent")
    item.createOrReplaceTempView("item")
    //    date.createOrReplaceTempView("date")
    organization.createOrReplaceTempView("organization")
    df_to_doi_c2.createOrReplaceTempView("df_to_doi_c2")
    df_appuser.createOrReplaceTempView("df_appuser")

    val excel = spark.read.format("csv")
      .option("header", "true")
      .option("multiLine", "true")
      //    .load("/raw_zone/excel_form/Order.csv")
      .load("/raw_zone/excel_form/upload/rangdong_store/c2_*.csv")
      .withColumn("Ngay_dat_hang", split(col("Ngày đặt hàng"), " ")(0))
      .withColumn(
        "Ngay_dat_hang",
        when(col("Ngay_dat_hang").contains("/"), to_date(col("Ngay_dat_hang"), "M/d/y"))
          .when(col("Ngay_dat_hang").contains("-"), to_date(col("Ngay_dat_hang"), "yyyy-MM-dd")))
      .withColumn("OrderDate", from_unixtime(unix_timestamp(col("Ngày đặt hàng"), "yyyy-MM-dd HH:mm:ss")).alias("OrderDate"))

    val don_hang = excel
      .join(mappingc2, mappingc2("User_Name") === excel("Username"), "left")
      .withColumn(("Id_excel"), substring(excel("Username"), -7, 7).cast("bigint"))
      .join(store, store("Id") === col("Id_excel"), "left")
      .join(saleplan_product_class, excel("Barcode") === saleplan_product_class("ItemCodeSAP"), "left")
      .filter(excel("OrderDate").between(store("StartDate") + expr("INTERVAL 7 HOURS"), store("EndDate") + expr("INTERVAL 7 HOURS")))
      .filter(col("StoreTypeId").isin(17, 18, 23))
      .withColumn("Organization",
        when(store("Code").startsWith("TV01"), lit("TV01"))
          .when(store("Code").startsWith("TV02"), lit("TV02"))
          .when(store("Code").startsWith("TV03"), lit("TV03"))
          .when(store("Code").startsWith("TN"), lit("TN"))
          .when(store("Code").startsWith("NT"), lit("NT"))
          .when(store("Code").startsWith("TG"), lit("TG"))
          .when(store("Code").startsWith("CT"), lit("CT"))
          .when(store("Code").startsWith("DN"), lit("DN"))
          .when(store("Code").startsWith("BH"), lit("BH"))
          .when(store("Code").startsWith("HCM"), lit("HCM")))
      .withColumn("SaleDepartment",
        when(store("Code").startsWith("TV01") or store("Code").startsWith("TV02") or store("Code").startsWith("TV03"), lit("P_BH1"))
          .when(store("Code").startsWith("TN") or store("Code").startsWith("TG") or store("Code").startsWith("NT") or store("Code").startsWith("CT") or store("Code").startsWith("DN") or store("Code").startsWith("BH") or store("Code").startsWith("HCM"), lit("P_BH2")))
      .select(
        to_date(excel("Ngay_dat_hang")).alias("Day"),
        (col("Số lượng") * col("Đơn Giá") - coalesce(col("Giảm giá"), lit("0"))).alias("Amount"),
        col("SaleDepartment"),
        col("Organization"),
        col("Số lượng").as("Quantity"),
        saleplan_product_class("SPKM_BH1"),
        saleplan_product_class("SPKM_BH2"),
        mappingc2("Doi").alias("Team_Name")

      )

    val saleout_sp_km_temp = spark.sql(
      s"""
     SELECT date_format(OrderDate,"yyyy-MM-dd") Day,
        isoc.Amount,
       CASE
           WHEN iso.StoreCode LIKE 'TV01%' THEN 'P_BH1'
           WHEN iso.StoreCode LIKE 'TV02%' THEN 'P_BH1'
           WHEN iso.StoreCode LIKE 'TV03%' THEN 'P_BH1'
           WHEN iso.StoreCode LIKE 'TN%' THEN 'P_BH2'
           WHEN iso.StoreCode LIKE 'TG%' THEN 'P_BH2'
           WHEN iso.StoreCode LIKE 'NT%' THEN 'P_BH2'
           WHEN iso.StoreCode LIKE 'CT%' THEN 'P_BH2'
           WHEN iso.StoreCode LIKE 'DN%' THEN 'P_BH2'
           WHEN iso.StoreCode LIKE 'BH%' THEN 'P_BH2'
           WHEN iso.StoreCode LIKE 'HCM%' THEN 'P_BH2'
       END AS SaleDepartment,
       CASE
           when s.Code like 'TV01%' then "TV01"
           when s.Code like 'TV02%' then "TV02"
           when s.Code like 'TV03%' then "TV03"
           when s.Code like 'TN%' then "TN"
           when s.Code like 'TG%'then "TG"
           when s.Code like 'NT%'then "NT"
           when s.Code like 'CT%' then "CT"
           when s.Code like 'DN%' then "DN"
           when s.Code like 'BH%' then "BH"
           when s.Code like 'HCM%' then "HCM"
       END AS Organization,
       isoc.Quantity,
       spc.SPKM_BH1,
       spc.SPKM_BH2,
       dtdc2.Team_Name
       FROM indirectsaleordercontent AS isoc
       LEFT JOIN indirectsalesorder as iso on isoc.IndirectSalesOrderId = iso.ID
       LEFT JOIN store AS s ON s.Id = iso.buyerstoreID and iso.OrderDate BETWEEN s.StartDate and s.EndDate
       LEFT JOIN item as i on i.ID = isoc.ItemID
       LEFT JOIN saleplan_product_class as spc on i.Code = spc.ItemCodeSAP
       LEFT JOIN df_appuser as da on da.ID = iso.SaleEmployeeId
       LEFT JOIN df_to_doi_c2 as dtdc2 on dtdc2.Username = da.Username
       WHERE iso.generalindirectstateID = 3  and iso.BuyerStoreTypeID in (17,18,23)
    """.stripMargin).unionByName(don_hang)

    saleout_sp_km_temp.createOrReplaceTempView("saleout_sp_km_temp")


    val saleout_sp_km_month = spark.sql(
      s"""
     SELECT Day,
       SaleDepartment, Organization, SPKM_BH1,SPKM_BH2,
       sum(Amount) as SaleOut_DT_SP_KM,
       sum(Quantity) as SaleOut_SL_SP_KM,
       Team_Name
       FROM saleout_sp_km_temp
       group by Day, SPKM_BH1,SPKM_BH2, SaleDepartment , Organization, Team_Name
    """.stripMargin)
    val df_saleout_sp_km_kh = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/saleout_spkm.csv")
      .filter(col("Ten_chi_tieu") === "SaleOut" and col("Loai_KH") === "Thang")
      .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
      .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
      .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
      .groupBy("Month", "Quarter", "Year", "Don_vi", "SPKM_BH1","SPKM_BH2", "Loai_KH","Vung_CN","Ten_doi")
      .agg(sum("Doanh_thu").as("Doanh_thu_KH"),sum("So_luong").as("So_luong_KH"))

    // Chia kế hoạch theo từng ngày
    val my_date = spark.read.format("avro").load("/raw_zone/full_load/date/")
    val date_of_month = spark.read.format("avro").load("/raw_zone/full_load/date/").groupBy("month", "quarter", "year").agg(count("*").alias("so_ngay"))
    val count_date_of_month = my_date.as("t1").join(date_of_month.as("t2"), my_date("month") === date_of_month("month") && my_date("year") === date_of_month("year"))
      .select(col("t1.date"), col("t1.month"), col("t1.quarter"), col("t1.year"), col("t2.so_ngay"))
    val df_saleout_sp_km_kh_date = df_saleout_sp_km_kh.join(count_date_of_month, df_saleout_sp_km_kh("Month") === count_date_of_month("Month") and df_saleout_sp_km_kh("Year") === count_date_of_month("Year"), "left")
      .withColumn("Doanh_thu_KH", col("Doanh_thu_KH") /col("so_ngay"))
      .withColumn("So_luong_KH", col("So_luong_KH") /col("so_ngay"))
    val saleout_sp_km = saleout_sp_km_month.join(
        df_saleout_sp_km_kh_date,
        coalesce(saleout_sp_km_month("Day"), lit("")) === coalesce(df_saleout_sp_km_kh_date("date"), lit("")) &&
          coalesce(saleout_sp_km_month("SaleDepartment"), lit("")) === coalesce(df_saleout_sp_km_kh_date("Don_vi"), lit("")) &&
          coalesce(saleout_sp_km_month("Organization"), lit("")) === coalesce(df_saleout_sp_km_kh_date("Vung_CN"), lit("")) &&
          coalesce(saleout_sp_km_month("Team_Name"), lit("")) === coalesce(df_saleout_sp_km_kh_date("Ten_doi"), lit("")) &&
          coalesce(saleout_sp_km_month("SPKM_BH1"), lit("")) === coalesce(df_saleout_sp_km_kh_date("SPKM_BH1"), lit("")) &&
          coalesce(saleout_sp_km_month("SPKM_BH2"), lit("")) === coalesce(df_saleout_sp_km_kh_date("SPKM_BH2"), lit("")),
        "full"
      )
      .select(
        coalesce(saleout_sp_km_month("Day"), df_saleout_sp_km_kh_date.col("date")).alias("Day"),
        coalesce(saleout_sp_km_month("SaleDepartment"), df_saleout_sp_km_kh_date.col("Don_vi")).alias("SaleDepartment"),
        coalesce(saleout_sp_km_month("Organization"), df_saleout_sp_km_kh_date.col("Vung_CN")).alias("Organization"),
        coalesce(saleout_sp_km_month("SPKM_BH1"), df_saleout_sp_km_kh_date.col("SPKM_BH1")).alias("SPKM_BH1"),
        coalesce(saleout_sp_km_month("SPKM_BH2"), df_saleout_sp_km_kh_date.col("SPKM_BH2")).alias("SPKM_BH2"),
        saleout_sp_km_month("SaleOut_DT_SP_KM"),
        saleout_sp_km_month("SaleOut_SL_SP_KM"),
        df_saleout_sp_km_kh_date("Doanh_thu_KH").alias("SaleOut_DT_SP_KM_KH"),
        df_saleout_sp_km_kh_date("So_luong_KH").alias("SaleOut_SL_SP_KM_KH"),
        coalesce(saleout_sp_km_month("Team_Name"), df_saleout_sp_km_kh_date.col("Ten_doi")).alias("Team_Name")
      )
    val res = saleout_sp_km
      .filter(col("Day").between("2024-01-01", "2024-12-31"))
      .select(
        lit("SaleOut").alias("Ten_chi_tieu"),
        lit(null).alias("Thoi_gian_ke_hoach"),
        col("SaleDepartment").alias("Don_vi"),
        col("Organization").alias("Vung_CN"),
        col("SPKM_BH1").alias("SPKM_BH1"),
        col("SPKM_BH2").alias("SPKM_BH2"),
        col("Team_Name").alias("Ten_doi"),
        lit(null).alias("So_luong"),
        lit(null).alias("Doanh_thu")
      )

    val re = res.distinct()
    val reCast = re
      .withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType))
      .withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType))
      .withColumn("Don_vi", col("Don_vi").cast(StringType))
      .withColumn("Vung_CN", col("Vung_CN").cast(StringType))
      .withColumn("SPKM_BH1", col("SPKM_BH1").cast(StringType))
      .withColumn("SPKM_BH2", col("SPKM_BH2").cast(StringType))
      .withColumn("Ten_doi", col("Ten_doi").cast(StringType))
      .withColumn("So_luong", col("So_luong").cast(IntegerType))
      .withColumn("Doanh_thu", col("Doanh_thu").cast(DoubleType))
    saleout_sp_km.write.format("avro").mode("overwrite").save(AppConfig.outputDir)
  }
}

//code lay bieu mau saleout nsp 
package vn.com.viettel.code.dashboard_date

import org.apache.spark.sql.functions._
import vn.com.viettel.AppConfig
import vn.com.viettel.utils.SparkUtils.createBatchSession
object saleout_dt_sp_led_phich {


  def main(args: Array[String]): Unit = {
    val spark = createBatchSession()

    val doi = spark.read.format("csv").option("multiLine", "true").option("header", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/to_doi_saleout.csv")
    val appuser = spark.read.table("whs.masterdata_appuser")
    val mappingc2 =spark.read.format("csv").option("multiLine", "true").option("header", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/mapping_dms_c2td.csv")
    //create temp_product
    val product_class = spark.read.table("WHS.SalePlan_ProductClass")
    val product_class_detail = spark.read.table("WHS.SalePlan_ProductClassDetail")
    val product_class_detail1 = spark.read.table("WHS.SalePlan_ProductClassDetail")
    val product_class_detail2 = spark.read.table("WHS.SalePlan_ProductClassDetail")

    val indirectsalesorder = spark.read.table("WHS.Order_IndirectSalesOrder")
      .withColumn("OrderDate", col("OrderDate") + expr("INTERVAL 7 HOURS"))
    //.filter(col("GeneralIndirectStateId").isin(3) &&  col("BuyerStoreTypeId").isin(17,18,23))
    val store = spark.read.table("WHS.MasterData_DMSStore")
    val storetype = spark.read.table("WHS.MasterData_StoreType")
    val province = spark.read.table("WHS.MasterData_Province")
    val indirectsalesordercontent = spark.read.table("WHS.Order_IndirectSalesOrderContent")
    val item = spark.read.table("WHS.MasterData_Item")
    val product = spark.read.table("WHS.Product_Product")

    val temp_product = product_class
      .join(product_class_detail, product_class("N_SP1") === product_class_detail("ID"), "left")
      .join(product_class_detail1.as("product_class_detail1"), product_class("N_SP2") === product_class_detail1("ID"), "left")
      .join(product_class_detail2.as("product_class_detail2"), product_class("N_SP3") === product_class_detail2("ID"), "left")
      .select(product_class("ItemName"),
        product_class("ItemCodeSAP"),
        product_class("N_SP1"),
        product_class("N_SP2"),
        product_class("N_SP3"),
        product_class("SPKM_BH1"),
        product_class("SPKM_BH2"),
        product_class_detail("Class").alias("Name_NSP1"),
        col("product_class_detail1.Class").alias("Name_NSP2"),
        col("product_class_detail2.Class").alias("Name_NSP3")
      )

    val excel = spark.read.format("csv")
      .option("header", "true")
      .option("multiLine", "true")
      //    .load("/raw_zone/excel_form/Order.csv")
      .load("/raw_zone/excel_form/upload/rangdong_store/c2_*.csv")
      .withColumn("Ngay_dat_hang", split(col("Ngày đặt hàng"), " ")(0))
      .withColumn(
        "Ngay_dat_hang",
        when(col("Ngay_dat_hang").contains("/"), to_date(col("Ngay_dat_hang"), "M/d/y"))
          .when(col("Ngay_dat_hang").contains("-"), to_date(col("Ngay_dat_hang"), "yyyy-MM-dd")))
      .withColumn("OrderDate", from_unixtime(unix_timestamp(col("Ngày đặt hàng"), "yyyy-MM-dd HH:mm:ss")).alias("OrderDate"))

    val don_hang = excel
      .join(mappingc2, mappingc2("User_Name") === excel("Username"), "left")
      .withColumn(("Id_excel"), substring(excel("Username"), -7, 7).cast("bigint"))
      .join(store, store("Id") === col("Id_excel"), "left")
      .join(temp_product, excel("Barcode") === temp_product("ItemCodeSAP"), "left")
      .join(province, store("ProvinceId") === province("Id"), "left")
      .filter(excel("OrderDate").between(store("StartDate") + expr("INTERVAL 7 HOURS"), store("EndDate") + expr("INTERVAL 7 HOURS")))
      .filter(col("StoreTypeId").isin(17, 18, 23))
      .withColumn("Organization",
        when(store("Code").startsWith("TV01"), lit("TV01"))
          .when(store("Code").startsWith("TV02"), lit("TV02"))
          .when(store("Code").startsWith("TV03"), lit("TV03"))
          .when(store("Code").startsWith("TN"), lit("TN"))
          .when(store("Code").startsWith("NT"), lit("NT"))
          .when(store("Code").startsWith("TG"), lit("TG"))
          .when(store("Code").startsWith("CT"), lit("CT"))
          .when(store("Code").startsWith("DN"), lit("DN"))
          .when(store("Code").startsWith("BH"), lit("BH"))
          .when(store("Code").startsWith("HCM"), lit("HCM")))
      .withColumn("SaleDepartment",
        when(store("Code").startsWith("TV01") or store("Code").startsWith("TV02") or store("Code").startsWith("TV03"), lit("P_BH1"))
          .when(store("Code").startsWith("TN") or store("Code").startsWith("TG") or store("Code").startsWith("NT") or store("Code").startsWith("CT") or store("Code").startsWith("DN") or store("Code").startsWith("BH") or store("Code").startsWith("HCM"), lit("P_BH2")))
      .select(
        to_date(excel("Ngay_dat_hang")).alias("Day"),
        (col("Số lượng") * col("Đơn Giá") - coalesce(col("Giảm giá"), lit("0"))).alias("Amount"),
        col("SaleDepartment"),
        col("Organization"),
        when(store("StoreTypeId") === 17, lit("Cấp 2"))
          .when(store("StoreTypeId") === 18, lit("Cấp 2 Trọng điểm"))
          .when(store("StoreTypeId") === 23, lit("Cấp  C2 siêu lớn"))
          .alias("CustomerType"),
        temp_product("N_SP1"),
        temp_product("N_SP2"),
        temp_product("N_SP3"),
        temp_product("Name_NSP1"),
        temp_product("Name_NSP2"),
        temp_product("Name_NSP3"),
        mappingc2("Doi").alias("Team_Name")
      )

    val Indirectsalesordercontent = indirectsalesordercontent
      .join(indirectsalesorder, indirectsalesordercontent("IndirectSalesOrderId") ===indirectsalesorder("Id") , "right")
      .join(store, indirectsalesorder("BuyerStoreId") === store("Id") && indirectsalesorder("OrderDate").between( store("StartDate"), store("EndDate")), "left")
      .join(province, store("ProvinceId") === province("Id"), "left")
      .join(item, indirectsalesordercontent("ItemId") === item("Id"), "left")
      .join(product, item("ProductId") === product("Id"), "left")
      .join(temp_product, product("Code") === temp_product("ItemCodeSAP"), "left")
      .filter(indirectsalesorder("BuyerStoreTypeID").isin(17,18,23) && indirectsalesorder("GeneralIndirectStateId")=== 3)
      .join(appuser, indirectsalesorder("SaleEmployeeId") === appuser("Id"), "left")
      .join(doi, doi("Username") === appuser("Username"), "left")
      .select(
        date_format(indirectsalesorder("OrderDate"),"yyyy-MM-dd").alias("Day"),
        when(indirectsalesorder("StoreCode").startsWith("TV01") or indirectsalesorder("StoreCode").startsWith("TV02") or indirectsalesorder("StoreCode").startsWith("TV03"), lit("P_BH1")).when(
          indirectsalesorder("StoreCode").startsWith("TN") ||
            indirectsalesorder("StoreCode").startsWith("TG") ||
            indirectsalesorder("StoreCode").startsWith("NT") ||
            indirectsalesorder("StoreCode").startsWith("CT") ||
            indirectsalesorder("StoreCode").startsWith("DN") ||
            indirectsalesorder("StoreCode").startsWith("BH") ||
            indirectsalesorder("StoreCode").startsWith("HCM"),
          lit("P_BH2")
        ).alias("SaleDepartment"),
        when(store("Code").startsWith("TV01"), lit("TV01"))
          .when(store("Code").startsWith("TV02"), lit("TV02"))
          .when(store("Code").startsWith("TV03"), lit("TV03"))
          .when(store("Code").startsWith("TN"), lit("TN"))
          .when(store("Code").startsWith("NT"), lit("NT"))
          .when(store("Code").startsWith("TG"), lit("TG"))
          .when(store("Code").startsWith("CT"), lit("CT"))
          .when(store("Code").startsWith("DN"), lit("DN"))
          .when(store("Code").startsWith("BH"), lit("BH"))
          .when(store("Code").startsWith("HCM"), lit("HCM"))
          .alias("Organization"),
        when(indirectsalesorder("BuyerStoreTypeID") === 17, lit("Cấp 2"))
          .when(indirectsalesorder("BuyerStoreTypeID") === 18, lit("Cấp 2 Trọng điểm"))
          .when(indirectsalesorder("BuyerStoreTypeID") === 23, lit("Cấp  C2 siêu lớn"))
          .alias("CustomerType"),
        temp_product("N_SP1"),
        temp_product("N_SP2"),
        temp_product("N_SP3"),
        temp_product("Name_NSP1"),
        temp_product("Name_NSP2"),
        temp_product("Name_NSP3"),
        doi("Team_Name"),
        indirectsalesordercontent("Amount")
      ).unionByName(don_hang)

    val Loai_thang = Indirectsalesordercontent.groupBy("Day",  "SaleDepartment", "Organization",  "CustomerType","N_SP1","N_SP2","N_SP3",
      "Name_NSP1", "Name_NSP2", "Name_NSP3", "Team_Name").agg(sum("Amount").alias("SaleOut_LP_TH"))

    val plan = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/saleout_nsp.csv")
      .filter(col("Ten_chi_tieu") === "saleout_nsp" and col("Loai_KH") === "Thang")
      .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
      .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
      .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
      .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))

    val planWithProduct = plan.join(temp_product, plan("Name_NSP1") === temp_product("Name_NSP1")
        && plan("Name_NSP2") === temp_product("Name_NSP2")
        && plan("Name_NSP3") === temp_product("Name_NSP3"), "left")
      .select( plan("Month"),
        plan("Quarter"),
        plan("Year"),
        plan("Don_vi"),
        plan("Loai_KH"),
        plan("Cap_dai_ly"),
        plan("Vung_CN"),
        plan("Ten_doi"),
        plan("Doanh_thu"),
        plan("Tinh_thanh_pho"),
        temp_product("N_SP1").as("N_SP1"),
        temp_product("N_SP2").as("N_SP2"),
        temp_product("N_SP3").as("N_SP3"),
        plan("Name_NSP1"),
        plan("Name_NSP2"),
        plan("Name_NSP3")
      )
    val df_saleout_dt_kh = planWithProduct.filter(col("Loai_KH") === "Thang")
      .groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN", "Tinh_thanh_pho", "Cap_dai_ly","N_SP1","N_SP2","N_SP3","Name_NSP1", "Name_NSP2","Name_NSP3" ,"Ten_doi")
      .agg(sum("Doanh_thu").as("Doanh_thu_KH"))
    // Chia kế hoạch theo từng ngày
    val my_date = spark.read.format("avro").load("/raw_zone/full_load/date/")
    val date_of_month = spark.read.format("avro").load("/raw_zone/full_load/date/").groupBy("month", "quarter", "year").agg(count("*").alias("so_ngay"))
    val count_date_of_month = my_date.as("t1").join(date_of_month.as("t2"), my_date("month") === date_of_month("month") && my_date("year") === date_of_month("year"))
      .select(col("t1.date"), col("t1.month"), col("t1.quarter"), col("t1.year"), col("t2.so_ngay"))
    val df_saleout_dt_kh_date = df_saleout_dt_kh.join(count_date_of_month, df_saleout_dt_kh("Month") === count_date_of_month("Month") and df_saleout_dt_kh("Year") === count_date_of_month("Year"), "left")
      .withColumn("Doanh_thu_KH", col("Doanh_thu_KH") /col("so_ngay"))

    val result = Loai_thang.join(
      df_saleout_dt_kh_date,
      coalesce(Loai_thang("Day"), lit("")) === coalesce(df_saleout_dt_kh_date("date"), lit("")) &&
        coalesce(Loai_thang("SaleDepartment"), lit("")) === coalesce(df_saleout_dt_kh_date("Don_vi"), lit("")) &&
        coalesce(Loai_thang("Organization"), lit("")) === coalesce(df_saleout_dt_kh_date("Vung_CN"), lit("")) &&
        coalesce(Loai_thang("Team_Name"), lit("")) === coalesce(df_saleout_dt_kh_date("Ten_doi"), lit("")) &&
        coalesce(Loai_thang("CustomerType"), lit("")) === coalesce(df_saleout_dt_kh_date("Cap_dai_ly"), lit("")) &&
        coalesce(Loai_thang("N_SP1"), lit("")) === coalesce(df_saleout_dt_kh_date("N_SP1"), lit("")) &&
        coalesce(Loai_thang("N_SP2"), lit("")) === coalesce(df_saleout_dt_kh_date("N_SP2"), lit("")) &&
        coalesce(Loai_thang("N_SP3"), lit("")) === coalesce(df_saleout_dt_kh_date("N_SP3"), lit("")) &&
        coalesce(Loai_thang("Name_NSP1"), lit("")) === coalesce(df_saleout_dt_kh_date("Name_NSP1"), lit("")) &&
        coalesce(Loai_thang("Name_NSP2"), lit("")) === coalesce(df_saleout_dt_kh_date("Name_NSP2"), lit("")) &&
        coalesce(Loai_thang("Name_NSP3"), lit("")) === coalesce(df_saleout_dt_kh_date("Name_NSP3"), lit("")),
      "full"
    ).select(
      coalesce(Loai_thang("Day"), df_saleout_dt_kh_date.col("date")).alias("Day"),
      coalesce(Loai_thang("SaleDepartment"), df_saleout_dt_kh_date.col("Don_vi")).alias("SaleDepartment"),
      coalesce(Loai_thang("Organization"), df_saleout_dt_kh_date.col("Vung_CN")).alias("Organization"),
      coalesce(Loai_thang("CustomerType"), df_saleout_dt_kh_date.col("Cap_dai_ly")).alias("CustomerType"),
      coalesce(Loai_thang("N_SP1"), df_saleout_dt_kh_date.col("N_SP1")).alias("ID_NSP1"),
      coalesce(Loai_thang("N_SP2"), df_saleout_dt_kh_date.col("N_SP2")).alias("ID_NSP2"),
      coalesce(Loai_thang("N_SP3"), df_saleout_dt_kh_date.col("N_SP3")).alias("ID_NSP3"),
      coalesce(Loai_thang("Name_NSP1"), df_saleout_dt_kh_date.col("Name_NSP1")).alias("Name_NSP1"),
      coalesce(Loai_thang("Name_NSP2"), df_saleout_dt_kh_date.col("Name_NSP2")).alias("Name_NSP2"),
      coalesce(Loai_thang("Name_NSP3"), df_saleout_dt_kh_date.col("Name_NSP3")).alias("Name_NSP3"),
      coalesce(Loai_thang("Team_Name"), df_saleout_dt_kh_date.col("Ten_doi")).alias("Team_Name"),
      Loai_thang("SaleOut_LP_TH").alias("SaleOut_LP_TH"),
      df_saleout_dt_kh_date("Doanh_thu_KH").alias("SaleOut_LP_KH"),
    ).withColumn("Ma_Nhom", concat(lit("L"), substring(col("ID_NSP1"), -1, 1)))

    val filteredResult = result
      .filter(col("Day").between("2024-01-01", "2024-12-31"))
      .select(
        lit("saleout_nsp").alias("Ten_chi_tieu"),
        lit(null).alias("Thoi_gian_ke_hoach"),
        col("SaleDepartment").alias("Don_vi"),
        col("Organization").alias("Vung_CN"),
        col("CustomerType").alias("Cap_dai_ly"),
        col("Team_Name").alias("Ten_doi"),
        col("Name_NSP1").alias("Name_NSP1"),
        col("Name_NSP2").alias("Name_NSP2"),
        col("Name_NSP3").alias("Name_NSP3"),
        lit(null).alias("So_luong"),
        lit(null).alias("Doanh_thu")
      )

    // Apply distinct to ensure unique rows
    val re = filteredResult.distinct()
    val reCast = re
      .withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType))
      .withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType))
      .withColumn("Don_vi", col("Don_vi").cast(StringType))
      .withColumn("Vung_CN", col("Vung_CN").cast(StringType))
      .withColumn("Cap_dai_ly", col("Cap_dai_ly").cast(StringType))
      .withColumn("Ten_doi", col("Ten_doi").cast(StringType))
      .withColumn("Name_NSP1", col("Name_NSP1").cast(StringType))
      .withColumn("Name_NSP2", col("Name_NSP2").cast(StringType))
      .withColumn("Name_NSP3", col("Name_NSP3").cast(StringType))
      .withColumn("So_luong", col("So_luong").cast(IntegerType))
      .withColumn("Doanh_thu", col("Doanh_thu").cast(DoubleType))

    result.write.format("avro").mode("overwrite").save(AppConfig.outputDir)

  }
}


Editor is loading...
Leave a Comment