Untitled

 avatar
unknown
plain_text
a year ago
12 kB
4
Indexable
package vn.com.viettel.code.dashboard_date

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import vn.com.viettel.AppConfig
import vn.com.viettel.utils.SparkUtils.createBatchSession

object kpicustomer_checking_customer_info {

  def main(args: Array[String]): Unit = {
    val spark = createBatchSession()
    //kpicustomer_checking_customer_info
    val so_hinh_anh_chup_ve = spark.read.format("avro").load("/gold_zone/full_load/kpi_date/so_hinh_anh_chup_ve").select(
      col("CreatedDate"),
      when(col("Phong") === "BH1", lit("P_BH1"))
        .when(col("Phong") === "BH2", lit("P_BH2"))
        .alias("SalesDepartment"),
      col("Vung").alias("OrganizationName"),
      col("Team_Name"),
      col("So_Hinh_Anh").alias("Perform_Quantity")
    )
    val tt_van_de_th = spark.read.format("avro").load("/gold_zone/full_load/kpi_date/tt_van_de_th").groupBy(

      col("Day"),
      col("SaleDepartment").alias("SalesDepartment"),
      col("Organization").alias("OrganizationName"),
      col("Team_Name")
    ).agg(sum("ProblemCount") as "Perform_Quantity")
    val cap_nhap_thong_tin_diem_ban = spark.read.format("avro").load("/gold_zone/full_load/kpi_date/cap_nhap_thong_tin_diem_ban")
      .select(
        col("UpdatedAt"),
        col("SaleDepartment").alias("SalesDepartment"),
        col("Organization").alias("OrganizationName"),
        col("Team_Name"),
        col("cap_nhap_thong_tin_diem_ban").alias("Perform_Quantity")
      )
    val so_hinh_anh_ke_hoach2 = spark.read.table("WHS.SalePlan_01_1_KHDV")

    val df_cap_nhat_thong_tin_diem_ban_kh = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/so_hinh_anh_chup_ve.csv")
      .filter(col("Ten_chi_tieu") === "Cập nhật thông tin" and col("Loai_KH") === "Thang")
      .withColumn("Don_vi", when(col("Don_vi") === "BH1", lit("P_BH1"))
        .when(col("Don_vi") === "BH2", lit("P_BH2"))
        .otherwise(col("Don_vi")))
      .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
      .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
      .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
      .groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN" , "Ten_doi")
      .agg(sum("So_luong").as("So_luong_KH"))
    // Chia kế hoạch theo từng ngày
    val my_date = spark.read.format("avro").load("/raw_zone/full_load/date/")
    val date_of_month = spark.read.format("avro").load("/raw_zone/full_load/date/").groupBy("month", "quarter", "year").agg(count("*").alias("so_ngay"))
    val count_date_of_month = my_date.as("t1").join(date_of_month.as("t2"), my_date("month") === date_of_month("month") && my_date("year") === date_of_month("year"))
      .select(col("t1.date"), col("t1.month"), col("t1.quarter"), col("t1.year"), col("t2.so_ngay"))
    val df_cap_nhat_thong_tin_diem_ban_kh_date = df_cap_nhat_thong_tin_diem_ban_kh.join(count_date_of_month, df_cap_nhat_thong_tin_diem_ban_kh("Month") === count_date_of_month("Month") and df_cap_nhat_thong_tin_diem_ban_kh("Year") === count_date_of_month("Year"), "left")
      .withColumn("So_luong_KH", col("So_luong_KH") /col("so_ngay"))

    val df_so_hinh_anh_chup_ve_kh = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/so_hinh_anh_chup_ve.csv")
      .filter(col("Ten_chi_tieu") === "Số Hình Ảnh" and col("Loai_KH") === "Thang")
      .withColumn("Don_vi", when(col("Don_vi") === "BH1", lit("P_BH1"))
          .when(col("Don_vi") === "BH2", lit("P_BH2"))
          .otherwise(col("Don_vi")))
      .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
      .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
      .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
      .groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN" , "Ten_doi")
      .agg(sum("So_luong").as("So_luong_KH"))
    val df_so_hinh_anh_chup_ve_kh_date = df_so_hinh_anh_chup_ve_kh.join(count_date_of_month, df_so_hinh_anh_chup_ve_kh("Month") === count_date_of_month("Month") and df_so_hinh_anh_chup_ve_kh("Year") === count_date_of_month("Year"), "left")
      .withColumn("So_luong_KH", col("So_luong_KH") /col("so_ngay"))

    val df_tt_van_de_th_kh = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/so_hinh_anh_chup_ve.csv")
      .filter(col("Ten_chi_tieu") === "Thông tin vấn đề" and col("Loai_KH") === "Thang")
      .withColumn("Don_vi", when(col("Don_vi") === "BH1", lit("P_BH1"))
        .when(col("Don_vi") === "BH2", lit("P_BH2"))
        .otherwise(col("Don_vi")))
      .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
      .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
      .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
      .groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN" , "Ten_doi")
      .agg(sum("So_luong").as("So_luong_KH"))
    val df_tt_van_de_th_date = df_tt_van_de_th_kh.join(count_date_of_month, df_tt_van_de_th_kh("Month") === count_date_of_month("Month") and df_tt_van_de_th_kh("Year") === count_date_of_month("Year"), "left")
      .withColumn("So_luong_KH", col("So_luong_KH") /col("so_ngay"))

    val result_so_hinh_anh_chup_ve = so_hinh_anh_chup_ve.join(
        df_so_hinh_anh_chup_ve_kh_date,
        coalesce(so_hinh_anh_chup_ve("CreatedDate"), lit("")) === coalesce(df_so_hinh_anh_chup_ve_kh_date("date"), lit("")) &&
          coalesce(so_hinh_anh_chup_ve("SalesDepartment"), lit("")) === coalesce(df_so_hinh_anh_chup_ve_kh_date("Don_vi"), lit("")) &&
          coalesce(so_hinh_anh_chup_ve("OrganizationName"), lit("")) === coalesce(df_so_hinh_anh_chup_ve_kh_date("Vung_CN"), lit("")) &&
          coalesce(so_hinh_anh_chup_ve("Team_Name"), lit("")) === coalesce(df_so_hinh_anh_chup_ve_kh_date("Ten_doi"), lit("")),
      "full"
      ).select(
        coalesce(so_hinh_anh_chup_ve("CreatedDate"), df_so_hinh_anh_chup_ve_kh_date.col("date")).alias("CreatedDate"),
        coalesce(so_hinh_anh_chup_ve("SalesDepartment"), df_so_hinh_anh_chup_ve_kh_date.col("Don_vi")).alias("SalesDepartment"),
        coalesce(so_hinh_anh_chup_ve("OrganizationName"), df_so_hinh_anh_chup_ve_kh_date.col("Vung_CN")).alias("OrganizationName"),
        coalesce(so_hinh_anh_chup_ve("Team_Name"), df_so_hinh_anh_chup_ve_kh_date.col("Ten_doi")).alias("Team_Name"),
      so_hinh_anh_chup_ve("Perform_Quantity").alias("Perform_Quantity"),
      df_so_hinh_anh_chup_ve_kh_date("So_luong_KH").alias("Plan_Quantity")
      )
    result_so_hinh_anh_chup_ve.createOrReplaceTempView("so_hinh_anh_chup_ve")

    val result_tt_van_de_th = tt_van_de_th.join(
      df_tt_van_de_th_date,
      coalesce(tt_van_de_th("Day"), lit("")) === coalesce(df_tt_van_de_th_date("date"), lit("")) &&
        coalesce(tt_van_de_th("SalesDepartment"), lit("")) === coalesce(df_tt_van_de_th_date("Don_vi"), lit("")) &&
        coalesce(tt_van_de_th("OrganizationName"), lit("")) === coalesce(df_tt_van_de_th_date("Vung_CN"), lit("")) &&
        coalesce(tt_van_de_th("Team_Name"), lit("")) === coalesce(df_tt_van_de_th_date("Ten_doi"), lit("")),
      "full"
    ).select(
      coalesce(tt_van_de_th("Day"), df_tt_van_de_th_date.col("date")).alias("Day"),
      coalesce(tt_van_de_th("SalesDepartment"), df_tt_van_de_th_date.col("Don_vi")).alias("SalesDepartment"),
      coalesce(tt_van_de_th("OrganizationName"), df_tt_van_de_th_date.col("Vung_CN")).alias("OrganizationName"),
      coalesce(tt_van_de_th("Team_Name"), df_tt_van_de_th_date.col("Ten_doi")).alias("Team_Name"),
      tt_van_de_th("Perform_Quantity").alias("Perform_Quantity"),
      df_tt_van_de_th_date("So_luong_KH").alias("Plan_Quantity")
    )
    result_tt_van_de_th.createOrReplaceTempView("tt_van_de_th")

    val result_cap_nhap_thong_tin_diem_ban = cap_nhap_thong_tin_diem_ban.join(
        df_cap_nhat_thong_tin_diem_ban_kh_date,
      coalesce(cap_nhap_thong_tin_diem_ban("UpdatedAt"), lit("")) === coalesce(df_cap_nhat_thong_tin_diem_ban_kh_date("date"), lit("")) &&
        coalesce(cap_nhap_thong_tin_diem_ban("SalesDepartment"), lit("")) === coalesce(df_cap_nhat_thong_tin_diem_ban_kh_date("Don_vi"), lit("")) &&
        coalesce(cap_nhap_thong_tin_diem_ban("OrganizationName"), lit("")) === coalesce(df_cap_nhat_thong_tin_diem_ban_kh_date("Vung_CN"), lit("")) &&
        coalesce(cap_nhap_thong_tin_diem_ban("Team_Name"), lit("")) === coalesce(df_cap_nhat_thong_tin_diem_ban_kh_date("Ten_doi"), lit("")),
      "full"
    ).select(
      coalesce(cap_nhap_thong_tin_diem_ban("UpdatedAt"), df_cap_nhat_thong_tin_diem_ban_kh_date.col("date")).alias("UpdatedAt"),
      coalesce(cap_nhap_thong_tin_diem_ban("SalesDepartment"), df_cap_nhat_thong_tin_diem_ban_kh_date.col("Don_vi")).alias("SalesDepartment"),
      coalesce(cap_nhap_thong_tin_diem_ban("OrganizationName"), df_cap_nhat_thong_tin_diem_ban_kh_date.col("Vung_CN")).alias("OrganizationName"),
      coalesce(cap_nhap_thong_tin_diem_ban("Team_Name"), df_cap_nhat_thong_tin_diem_ban_kh_date.col("Ten_doi")).alias("Team_Name"),
      cap_nhap_thong_tin_diem_ban("Perform_Quantity").alias("Perform_Quantity"),
      df_cap_nhat_thong_tin_diem_ban_kh_date("So_luong_KH").alias("Plan_Quantity"))

    result_cap_nhap_thong_tin_diem_ban.createOrReplaceTempView("cap_nhap_thong_tin_diem_ban")

    val kpicustomer_checking_customer_info = spark.sql(
      """
       select
          CreatedDate EventDate,
          SalesDepartment,
          OrganizationName,
          "Số Hình Ảnh" Category,
          Team_Name,
          sum(Perform_Quantity) Perform_Quantity,
          sum(Plan_Quantity) Plan_Quantity
       from so_hinh_anh_chup_ve
       group by EventDate,SalesDepartment,OrganizationName,Category, Team_Name
       union
       select
          Day,
          SalesDepartment,
          OrganizationName,
          "Thông tin vấn đề" Category,
          Team_Name,
          sum(Perform_Quantity) Perform_Quantity,
          sum(Plan_Quantity) Plan_Quantity
       from tt_van_de_th
       group by Day,SalesDepartment,OrganizationName,Category, Team_Name
       union
       select
          UpdatedAt,
          SalesDepartment,
          OrganizationName,
          "Cập nhật thông tin" Category,
          Team_Name,
          sum(Perform_Quantity) Perform_Quantity,
          sum(Plan_Quantity) Plan_Quantity
       from cap_nhap_thong_tin_diem_ban
       group by UpdatedAt,SalesDepartment,OrganizationName,Category, Team_Name
   """)
    val res = kpicustomer_checking_customer_info
      .filter(col("EventDate").between("2024-01-01","2024-12-31"))
      .select(
        col("Category").alias("Ten_chi_tieu"),
        lit(null).alias("Thoi_gian_ke_hoach"),
        col("SalesDepartment").alias("Don_vi"),
        col("OrganizationName").alias("Vung_CN"),
        col("Team_Name").alias("Ten_doi"),
        lit(null).alias("So_luong"),
        lit(null).alias("Loai_KH"),
      )

    val re = res.distinct()
    val reCast = re
      .withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType))
      .withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType))
      .withColumn("Don_vi", col("Don_vi").cast(StringType))
      .withColumn("Vung_CN", col("Vung_CN").cast(StringType))
      .withColumn("Ten_doi", col("Ten_doi").cast(StringType))
      .withColumn("So_luong", col("So_luong").cast(IntegerType))
      .withColumn("Loai_KH", col("Loai_KH").cast(StringType))
    kpicustomer_checking_customer_info.write.format("avro").mode("overwrite").save(AppConfig.outputDir)
  }
}
Editor is loading...
Leave a Comment