Untitled

 avatar
unknown
plain_text
a year ago
5.6 kB
5
Indexable
package vn.com.viettel.code.plan

import vn.com.viettel.utils.SparkUtils.createBatchSession
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import vn.com.viettel.AppConfig

object salein_tx_thutien_nsp {
  def execute_processing(spark: SparkSession): DataFrame = {
    //  Read data plan (csv format)
    val df_kh_salein_class = spark.read.table("excel_form.test_KH_BH1_KH")
      .filter(col("Ten_chi_tieu") === "Thực xuất")
      .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
      .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
      .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
      .withColumnRenamed("Don_vi", "SaleDepartment")
      .withColumnRenamed("Vung_CN", "OrganizationName")
      .withColumnRenamed("Kenh_ban", "SalesChannelName")
      .withColumnRenamed("Loai_KH", "Type")
      .withColumnRenamed("Ten_doi", "Team_Name")
      .withColumnRenamed("Thoi_gian_ke_hoach", "Day")
      .withColumn("Revenue_Salein_Perform", lit(0))
      .withColumn("Revenue_Salein_Plan", col("Doanh_thu").cast("double"))
      .withColumn("So_luong", col("So_luong").cast("int"))
      .select(
        "Day",
        "Month",
        "Quarter",
        "Year",
        "SaleDepartment",
        "OrganizationName",
        "SalesChannelName",
        "Name_NSP1",
        "Name_NSP2",
        "Name_NSP3",
        "Team_Name",
        "Revenue_Salein_Plan",
        "Revenue_Salein_Perform",
        "Type",
        "Loai_ke_hoach"
      )

    // Processing plan data
    val res1 = df_kh_salein_class.filter(col("Loai_ke_hoach") === "Đội" && col("Team_Name").isNotNull)
      .groupBy("Day","Month", "Quarter", "Year", "SaleDepartment","OrganizationName", "Name_NSP1",
        "Name_NSP2","Type")
      .agg(sum("Revenue_Salein_Plan").as("plan_theo_vung"))

    val df_kh_alias = df_kh_salein_class.as("df_kh")
    val res1_alias = res1.as("res1")

    val ke_hoach_kenh = df_kh_alias
      .join(res1_alias,
        df_kh_alias("Day") === res1_alias("Day") &&
        df_kh_alias("Month") === res1_alias("Month") &&
          df_kh_alias("Quarter") === res1_alias("Quarter") &&
          df_kh_alias("Year") === res1_alias("Year") &&
          df_kh_alias("SaleDepartment") === res1_alias("SaleDepartment") &&
          df_kh_alias("OrganizationName") === res1_alias("OrganizationName") &&
          df_kh_alias("Name_NSP1") === res1_alias("Name_NSP1") &&
          df_kh_alias("Name_NSP2") === res1_alias("Name_NSP2") &&
          df_kh_alias("Type") === res1_alias("Type")
        ,
        "left"
      )
      .filter(df_kh_alias("Loai_ke_hoach") === "Vùng" && df_kh_alias("Team_Name").isNull)
      .select(
        df_kh_alias("Day").as("Day"),
        df_kh_alias("Month").as("Month"),
        df_kh_alias("Quarter").as("Quarter"),
        df_kh_alias("Year").as("Year"),
        df_kh_alias("SaleDepartment").as("SaleDepartment"),
        df_kh_alias("OrganizationName"),
        df_kh_alias("SalesChannelName"),
        df_kh_alias("Name_NSP1"),
        df_kh_alias("Name_NSP2"),
        df_kh_alias("Name_NSP3"),
        df_kh_alias("Team_Name"),
        df_kh_alias("Type"),
        df_kh_alias("Loai_ke_hoach"),
        df_kh_alias("Revenue_Salein_Perform"),
        (df_kh_alias("Revenue_Salein_Plan") - res1_alias("plan_theo_vung")).as("Revenue_Salein_Plan")
      )

    val res2 = df_kh_salein_class.filter(col("Loai_ke_hoach") === "Vùng")
      .groupBy("Day","Month", "Quarter", "Year", "SaleDepartment", "Name_NSP1", "Name_NSP2","Type")
      .agg(sum("Revenue_Salein_Plan").as("plan_theo_pbh"))

    val res2_alias = res2.as("res2")

    val ke_hoach_pbh = df_kh_alias.join(res2_alias,
        df_kh_alias("Day") === res2_alias("Day") &&
          df_kh_alias("Month") === res2_alias("Month") &&
          df_kh_alias("Quarter") === res2_alias("Quarter") &&
          df_kh_alias("Year") === res2_alias("Year") &&
          df_kh_alias("SaleDepartment") === res2_alias("SaleDepartment") &&
          df_kh_alias("Name_NSP1") === res2_alias("Name_NSP1") &&
          df_kh_alias("Name_NSP2") === res2_alias("Name_NSP2") &&
          df_kh_alias("Type") === res2_alias("Type")
        ,
        "left"
      )
      .filter(df_kh_alias("Loai_ke_hoach") === "PBH" && df_kh_alias("SaleDepartment").isNotNull)
      .select(
        df_kh_alias("Day").as("Day"),
        df_kh_alias("Month").as("Month"),
        df_kh_alias("Quarter").as("Quarter"),
        df_kh_alias("Year").as("Year"),
        df_kh_alias("SaleDepartment").as("SaleDepartment"),
        df_kh_alias("OrganizationName"),
        df_kh_alias("SalesChannelName"),
        df_kh_alias("Name_NSP1"),
        df_kh_alias("Name_NSP2"),
        df_kh_alias("Name_NSP3"),
        df_kh_alias("Team_Name"),
        df_kh_alias("Type"),
        df_kh_alias("Loai_ke_hoach"),
        df_kh_alias("Revenue_Salein_Perform"),
        (df_kh_alias("Revenue_Salein_Plan") - res2_alias("plan_theo_pbh")).as("Revenue_Salein_Plan")
      )
    val ke_hoach_doi = df_kh_salein_class.filter(df_kh_salein_class("Loai_ke_hoach") === "Đội" && df_kh_salein_class("Team_Name").isNotNull)

    val  kh = ke_hoach_doi.unionByName(ke_hoach_pbh).unionByName(ke_hoach_kenh)

    // Return processed data of plan
    result
  }

  // Entry point
  def main(args: Array[String]): Unit = {
    val spark = createBatchSession()
    execute_processing(spark).write.format("avro").mode("overwrite").save(AppConfig.outputDir)
  }
}
Editor is loading...
Leave a Comment