Untitled

 avatar
unknown
plain_text
a year ago
10 kB
20
Indexable
package vn.com.viettel.code.dashboard_date

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import vn.com.viettel.AppConfig
import vn.com.viettel.utils.SparkUtils.createBatchSession

object doanh_thu_tmdt_lien_ket_gt_tt_date {
  def main(args: Array[String]): Unit = {
    val spark = createBatchSession()

    // Đọc dữ liệu kế hoạch 



    // Tổng hợp thực hiện
    import spark.implicits._    
    val df_order = spark.read.table("WHS.Order_EcommerceOrder")
      .filter(col("status") =!= "cancelled"
        && !(array_contains(col("tags"), "Đại lýC3") ||
        array_contains(col("tags"), "Đại LýC3") ||
        array_contains(col("tags"), "Đại lý C3") ||
        array_contains(col("tags"), "Đại Lý C3") ||
        exists(col("tags"), element => element.startsWith("RDMNC3")) ||
        exists(col("tags"), element => element.startsWith("RDMBC3")) ||
        exists(col("tags"), element => element.startsWith("C2TD")) ||
        exists(col("tags"), element => element.startsWith("RDC2")) ||
        exists(col("tags"), element => element.startsWith("IT Test")) ||
        exists(col("tags"), element => element.startsWith("test")) ||
        exists(col("tags"), element => element.startsWith("Test"))
        ))
      .withColumn("created_on", expr("created_on").cast("timestamp"))


    val df_status = spark.read.table("WHS.MasterData_EcommerceStatus")
    val df_source = spark.read.table("WHS.MasterData_OrderSource")
    val df_product_class = spark.read.table("WHS.SalePlan_ProductClass")
    val df_product_class_detail = spark.read.table("WHS.SalePlan_ProductClassDetail")
    val res = df_order.as("t0").join(df_status.as("t1"), col("t0.process_status_id") === col("t1.id"), "left")
      .where(
        col("t1.name") === "Đại lý TMDT" ||
          col("t1.name").like("CN.%") ||
          col("t1.name") === "TMDT" ||
          col("t1.name").isNull
      )
      .withColumn(
        "loai_doanh_thu",
        when(col("t1.name") === "Đại lý TMDT" || col("t1.name").like("CN.%"), "gian_tiep")
          .when(col("t1.name") === "TMDT" || col("t1.name").isNull, "truc_tiep")
      )
      .select(col("t0.*"), col("loai_doanh_thu"))

    val df_product_class_detail_clone_1 = df_product_class_detail.as("df_product_class_detail_clone_1")
    val df_product_class_detail_clone_2 = df_product_class_detail.as("df_product_class_detail_clone_2")
    val df_etl_product_class = df_product_class.as("df_product_class")
      .join(df_product_class_detail.as("t1"), col("df_product_class.N_SP1") === col("t1.ID"), "left")
      .join(df_product_class_detail_clone_1.as("t2"), col("df_product_class.N_SP2") === col("t2.ID"), "left")
      .join(df_product_class_detail_clone_2.as("t3"), col("df_product_class.N_SP3") === col("t3.ID"), "left")
      .select(
        col("df_product_class.ItemName"),
        col("df_product_class.ItemCodeSAP"),
        col("df_product_class.N_SP1"),
        col("df_product_class.N_SP2"),
        col("df_product_class.N_SP3"),
        col("df_product_class.SPKM_BH1"),
        col("df_product_class.SPKM_BH2"),
        col("t1.Class").alias("Name_NSP1"),
        col("t2.Class").alias("Name_NSP2"),
        col("t3.Class").alias("Name_NSP3")
      )
    val res2 = res.as("t").join(df_source.as("t1"), col("t.source_id") === col("t1.id"), "left")
      .withColumn("created_on_target", date_format(col("t.created_on"), "yyyy-MM-dd"))
      .select(
        col("created_on_target"),
        df_source("name").alias("Channel"),
        col("code"),
        col("loai_doanh_thu"),
        explode(df_order("Order_line_items")).alias("Order_line_items"))
      .selectExpr(
        "created_on_target",
        "Channel",
        "loai_doanh_thu",
        "code",
        "Order_line_items.line_amount",
        "Order_line_items.barcode"
      )

    val res3 = res2.join(df_etl_product_class, res2("barcode") === df_etl_product_class("ItemCodeSAP"), "left").createOrReplaceTempView("df_ecommercer_order")
    val df_tmdt_lk_kh = spark.read.format("csv")
      .option("header", "true")
      .option("multiLine", "true").load("/raw_zone/excel_form/upload/ke_hoach/mtdt_dt_lk.csv")
      .filter(col("Ten_chi_tieu") === "Doanh_thu_tmdt_lien_ket_gt_tt")
      .as("t1")
      .join(df_product_class_detail.as("t2"), col("t1.Name_NSP1") === col("t2.Class"), "left")
      .join(df_product_class_detail.as("t3"), col("t1.Name_NSP2") === col("t3.Class"), "left")
      .join(df_product_class_detail.as("t4"), col("t1.Name_NSP3") === col("t4.Class"), "left")
      .select(col("t1.*"), col("t2.ID").as("N_SP1"), col("t3.ID").as("N_SP2"),col("t4.ID").as("N_SP3"))
      .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
      .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
      .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
      .withColumn("So_luong", when(col("So_luong") === "nan", lit(0)))


    // Chia kế hoạch tháng ra các ngày
    val df_doanh_thu_dt_kh = df_tmdt_lk_kh.filter(col("Loai_KH") === "Thang")
      .groupBy("Month", "Quarter", "Year", "Kenh", "Loai_doanh_thu","N_SP1","N_SP2","N_SP3","Name_NSP1", "Name_NSP2","Name_NSP3")
      .agg(sum("Doanh_thu").as("Doanh_thu_KH"),sum("So_Luong").as("So_Luong_KH"))


    // Chia kế hoạch theo từng ngày
    val my_date = spark.read.format("avro").load("/raw_zone/full_load/date/")
    val date_of_month = spark.read.format("avro").load("/raw_zone/full_load/date/").groupBy("month", "quarter", "year").agg(count("*").alias("so_ngay"))
    val count_date_of_month = my_date.as("t1").join(date_of_month.as("t2"), my_date("month") === date_of_month("month") && my_date("year") === date_of_month("year"))
      .select(col("t1.date"), col("t1.month"), col("t1.quarter"), col("t1.year"), col("t2.so_ngay"))
    val df_dt_kh_date = df_doanh_thu_dt_kh.join(count_date_of_month, df_doanh_thu_dt_kh("Month") === count_date_of_month("Month") and df_doanh_thu_dt_kh("Year") === count_date_of_month("Year"), "left")
      .withColumn("Doanh_thu_KH", col("Doanh_thu_KH") /col("so_ngay"))
      .withColumn("So_Luong_KH", col("So_Luong_KH") /col("so_ngay"))

    val output_df = spark.sql(
      """
                    select  created_on_target as Date, Channel, N_SP1,N_SP2,N_SP3,Name_NSP1, Name_NSP2, Name_NSP3, sum(line_amount) DT_TMDT, count(distinct code) So_Luong, loai_doanh_thu
                    from df_ecommercer_order
                    group by created_on_target, Channel, N_SP1,N_SP2,N_SP3, Name_NSP1, Name_NSP2, Name_NSP3, loai_doanh_thu

                """)
    val result = output_df.join(df_dt_kh_date,
            output_df("Date") === df_dt_kh_date("date") and
            output_df("Channel") === df_dt_kh_date("Kenh") and
              output_df("N_SP1") === df_dt_kh_date("N_SP1") and
              output_df("N_SP2") === df_dt_kh_date("N_SP2") and
              output_df("N_SP3") === df_dt_kh_date("N_SP3") and
              output_df("Name_NSP1") === df_dt_kh_date("Name_NSP1") and
            output_df("Name_NSP2") === df_dt_kh_date("Name_NSP2") and
            output_df("Name_NSP3") === df_dt_kh_date("Name_NSP3") and
            output_df("loai_doanh_thu") === df_dt_kh_date("Loai_doanh_thu"),
            "full"
        ).select(
            coalesce(output_df("Date"), df_dt_kh_date("date")).as("Date"),
            coalesce(output_df("Channel"), df_dt_kh_date("Kenh")).as("Channel"),
      coalesce(output_df("N_SP1"), df_dt_kh_date("N_SP1")).as("ID_NSP1"),
        coalesce(output_df("N_SP2"), df_dt_kh_date("N_SP2")).as("ID_NSP2"),
        coalesce(output_df("N_SP3"), df_dt_kh_date("N_SP3")).as("ID_NSP3"),
        coalesce(output_df("Name_NSP1"), df_dt_kh_date("Name_NSP1")).as("Name_NSP1"),
            coalesce(output_df("Name_NSP2"), df_dt_kh_date("Name_NSP2")).as("Name_NSP2"),
            coalesce(output_df("Name_NSP3"), df_dt_kh_date("Name_NSP3")).as("Name_NSP3"),
            coalesce(output_df("loai_doanh_thu"), df_dt_kh_date("Loai_doanh_thu")).as("loai_doanh_thu"),
            output_df("DT_TMDT").as("DT_TMDT"),
            output_df("So_Luong").as("So_Luong"),
        df_dt_kh_date("Doanh_thu_KH").as("DT_TMDT_KH"),
        df_dt_kh_date("So_Luong_KH").as("So_Luong_KH")
            )
      .withColumn("Ma_Nhom", concat(lit("L"), substring(col("ID_NSP1"), -1, 1)))

    val filteredResult = result
      .filter(col("Date").between("2024-01-01", "2024-12-31"))
      .select(
        lit("Doanh_thu_tmdt_lien_ket_gt_tt").alias("Ten_chi_tieu"),
        lit(null).alias("Thoi_gian_ke_hoach"),
        col("loai_doanh_thu").alias("Loai_doanh_thu"),
        col("Channel").alias("Kenh_ban"),
        col("Name_NSP1").alias("Name_NSP1"),
        col("Name_NSP2").alias("Name_NSP2"),
        col("Name_NSP3").alias("Name_NSP3"),
        lit(null).alias("So_luong"),
        lit(null).alias("Doanh_thu"),
        lit(null).alias("Loai_KH")

      )

    // Apply distinct to ensure unique rows
    val re = filteredResult.distinct()
    val reCast = re
      .withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType))
      .withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType))
      .withColumn("Loai_doanh_thu", col("Loai_doanh_thu").cast(StringType))
      .withColumn("Kenh_ban", col("Kenh_ban").cast(StringType))
      .withColumn("Name_NSP1", col("Name_NSP1").cast(StringType))
      .withColumn("Name_NSP2", col("Name_NSP2").cast(StringType))
      .withColumn("Name_NSP3", col("Name_NSP3").cast(StringType))
      .withColumn("So_luong", col("So_luong").cast(IntegerType))
      .withColumn("Doanh_thu", col("Doanh_thu").cast(DoubleType))
      .withColumn("Loai_KH", col("Loai_KH").cast(StringType))

    result.write.format("avro").mode("overwrite").save(AppConfig.outputDir)
  }
}
Editor is loading...
Leave a Comment