Untitled

 avatar
unknown
plain_text
a year ago
7.4 kB
5
Indexable
package vn.com.viettel.code.dashboard_date

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
import vn.com.viettel.AppConfig.outputDir
import vn.com.viettel.utils.SparkUtils.createBatchSession

object kpicustomer_open_customer {
  def main(argv: Array[String]): Unit = {
    // Create Spark Session
    val spark: SparkSession = createBatchSession()

    // Load chi tieu c2_mm
    val c2_mm = spark.read.format("avro").load("/gold_zone/full_load/kpi_date/c2_mm")
      .withColumn(
        "Category",
        when(col("StoreTypeId") === 17, "C2 mở mới")
          .when(col("StoreTypeId") === 18, "C2 Trọng điểm mở mới")
          .when(col("StoreTypeId") === 23, "C2 Siêu lớn mở mới")
      ).groupBy(
      "OpenDate",
      "SaleDepartment",
      "OrganizationName",
      "Category",
      "Team_Name"
    ).agg(
      sum(col("So_luong_C2_MM")).as("Perform_Quantity")
    )

    // Load chi tieu c2_dc
    val c2_dc = spark.read.format("avro").load("/gold_zone/full_load/kpi_date/c2_dc")
      .withColumn(
        "Category",
        when(col("StoreTypeId") === 17, "C2 không hoạt động")
          .when(col("StoreTypeId") === 18, "C2 Trọng điểm không hoạt động")
          .when(col("StoreTypeId") === 23, "C2 Siêu lớn không hoạt động")
      ).groupBy(
      "OpenDate",
      "SaleDepartment",
      "OrganizationName",
      "Category",
      "Team_Name"
    ).agg(
      sum(col("So_luong_C2_DC")).as("Perform_Quantity")
    )
    // Load chi tieu khdv: hien tai de null
    // val khdv = spark.read.format("avro").load("/raw_zone/excel_form/sale_plan/01.1.KHDV.avro")


    val df_c2_mm_kh = spark.read.format("csv").option("header", "true").option("multiLine", "true")
      .load("/raw_zone/excel_form/upload/ke_hoach/c2_mm_dc.csv")
      .filter(col("Ten_chi_tieu") === "c2_mm" and col("Loai_KH") === "Thang")
      .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
      .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
      .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
      .groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN","Cap_dai_ly" , "Ten_doi")
      .agg(sum("So_luong").as("So_luong_KH"))
    // Chia kế hoạch theo từng ngày
    val my_date = spark.read.format("avro").load("/raw_zone/full_load/date/")
    val date_of_month = spark.read.format("avro").load("/raw_zone/full_load/date/").groupBy("month", "quarter", "year").agg(count("*").alias("so_ngay"))
    val count_date_of_month = my_date.as("t1").join(date_of_month.as("t2"), my_date("month") === date_of_month("month") && my_date("year") === date_of_month("year"))
      .select(col("t1.date"), col("t1.month"), col("t1.quarter"), col("t1.year"), col("t2.so_ngay"))
    val df_c2_mm_kh_date = df_c2_mm_kh.join(count_date_of_month, df_c2_mm_kh("Month") === count_date_of_month("Month") and df_c2_mm_kh("Year") === count_date_of_month("Year"), "left")
      .withColumn("So_luong_KH", col("So_luong_KH") /col("so_ngay"))

    val result_c2mm = c2_mm.join(
      df_c2_mm_kh_date,
      coalesce(c2_mm("OpenDate"), lit("")) === coalesce(df_c2_mm_kh_date("date"), lit("")) &&
        coalesce(c2_mm("SaleDepartment"), lit("")) === coalesce(df_c2_mm_kh_date("Don_vi"), lit("")) &&
        coalesce(c2_mm("OrganizationName"), lit("")) === coalesce(df_c2_mm_kh_date("Vung_CN"), lit("")) &&
        coalesce(c2_mm("Team_Name"), lit("")) === coalesce(df_c2_mm_kh_date("Ten_doi"), lit("")) ,
      "full"
    ).select(
      coalesce(c2_mm("OpenDate"), df_c2_mm_kh_date.col("date")).alias("Date"),
      coalesce(c2_mm("SaleDepartment"), df_c2_mm_kh_date.col("Don_vi")).alias("SaleDepartment"),
      coalesce(c2_mm("OrganizationName"), df_c2_mm_kh_date.col("Vung_CN")).alias("OrganizationName"),
      coalesce(c2_mm("Category"), df_c2_mm_kh_date.col("Cap_dai_ly")).alias("Category"),
      coalesce(c2_mm("Team_Name"), df_c2_mm_kh_date.col("Ten_doi")).alias("Team_Name"),
      c2_mm("Perform_Quantity").alias("Perform_Quantity"),
      df_c2_mm_kh_date("So_luong_KH").alias("Plan_Quantity")
    )

    val df_c2_dc_kh = spark.read.format("csv").option("header", "true").option("multiLine", "true")
      .load("/raw_zone/excel_form/upload/ke_hoach/c2_mm_dc.csv")
      .filter(col("Ten_chi_tieu") === "c2_dc" and col("Loai_KH") === "Thang")
      .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
      .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
      .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y")).groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN","Cap_dai_ly" , "Ten_doi")
      .agg(sum("So_luong").as("So_luong_KH"))
  // Chia kế hoạch theo từng ngày
    val df_c2_dc_kh_date = df_c2_dc_kh.join(count_date_of_month, df_c2_dc_kh("Month") === count_date_of_month("Month") and df_c2_dc_kh("Year") === count_date_of_month("Year"), "left")
      .withColumn("So_luong_KH", col("So_luong_KH") /col("so_ngay"))

    val result_c2dc = c2_dc.join(
      df_c2_dc_kh_date,
      coalesce(c2_dc("OpenDate"), lit("")) === coalesce(df_c2_dc_kh_date("date"), lit("")) &&
        coalesce(c2_dc("SaleDepartment"), lit("")) === coalesce(df_c2_dc_kh_date("Don_vi"), lit("")) &&
        coalesce(c2_dc("OrganizationName"), lit("")) === coalesce(df_c2_dc_kh_date("Vung_CN"), lit("")) &&
        coalesce(c2_dc("Team_Name"), lit("")) === coalesce(df_c2_dc_kh_date("Ten_doi"), lit("")) ,
      "full"
    ).select(
      coalesce(c2_dc("OpenDate"), df_c2_dc_kh_date.col("date")).alias("Date"),
      coalesce(c2_dc("SaleDepartment"), df_c2_dc_kh_date.col("Don_vi")).alias("SaleDepartment"),
      coalesce(c2_dc("OrganizationName"), df_c2_dc_kh_date.col("Vung_CN")).alias("OrganizationName"),
      coalesce(c2_dc("Category"), df_c2_dc_kh_date.col("Cap_dai_ly")).alias("Category"),
      coalesce(c2_dc("Team_Name"), df_c2_dc_kh_date.col("Ten_doi")).alias("Team_Name"),
      c2_dc("Perform_Quantity").alias("Perform_Quantity"),
      df_c2_dc_kh_date("So_luong_KH").alias("Plan_Quantity")
    )
    val date = result_c2dc.unionByName(result_c2mm)

    val res = date
      .filter(col("Date").between("2024-01-01", "2024-12-31") && col("Category").contains("mở mới"))
      .select(
        col("Category").alias("Ten_chi_tieu"),
        lit(null).alias("Thoi_gian_ke_hoach"),
        col("SaleDepartment").alias("Don_vi"),
        col("OrganizationName").alias("Vung_CN"),
        col("Team_Name").alias("Ten_doi"),
        lit(null).alias("So_luong"),
        lit(null).alias("Loai_KH")
      )

    val re = res.distinct()
    val reCast = re
      .withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType))
      .withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType))
      .withColumn("Don_vi", col("Don_vi").cast(StringType))
      .withColumn("Vung_CN", col("Vung_CN").cast(StringType))
      .withColumn("Ten_doi", col("Ten_doi").cast(StringType))
      .withColumn("So_luong", col("So_luong").cast(IntegerType))
      .withColumn("Loai_KH", col("Loai_KH").cast(StringType))
    date.write.format("avro").mode("overwrite").save(outputDir)
  }
}
Editor is loading...
Leave a Comment