Untitled

 avatar
unknown
plain_text
a year ago
15 kB
5
Indexable
package vn.com.viettel.code.dashboard
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import vn.com.viettel.AppConfig
import vn.com.viettel.utils.SparkUtils.{DataFrameOps, createBatchSession}
import org.apache.spark.sql.functions.{col, _}
import org.apache.spark.sql.expressions._


object salein_group_mt {

  def main(args: Array[String]): Unit = {
    val spark = createBatchSession()
      // Đọc dữ liệu kế hoach
      val kh_tx_spmt = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/salein_tx_thutien_spmt.csv").filter(col("Ten_chi_tieu") === "Doanh thu (SaleIn)")
        .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
        .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
        .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))

        // Kế hoạch tháng, quý, năm
        val kh_thang = kh_tx_spmt.filter(col("Loai_KH") === "Thang")
        val kh_quy = kh_tx_spmt.filter(col("Loai_KH") === "Quy")
        val kh_nam = kh_tx_spmt.filter(col("Loai_KH") === "Nam")


      spark.read.format("avro").load("/gold_zone/full_load/kpi/salein_dt_sp_mt").createOrReplaceTempView("salein_dt_sp_mt")
      val output = spark.sql(
            """
        select Month,Quarter , Year ,SaleDepartment ,  Organization OrganizationName,  Channel SalesChannelName,ProductGroupName GroupName, sum(Total) Revenue_Salein_Perform , sum(Quantity) Quantity_Salein_Perform ,"Thang" Type,Team_Name
        from salein_dt_sp_mt group by Month,Quarter , Year ,SaleDepartment ,  Organization ,  Channel ,ProductGroupName,Team_Name
        union all
        select null Month, Quarter , Year ,SaleDepartment ,  Organization OrganizationName,  Channel SalesChannelName,ProductGroupName GroupName, sum(Total) Revenue_Salein_Perform , sum(Quantity) Quantity_Salein_Perform ,"Quy" Type,Team_Name
        from salein_dt_sp_mt group by Quarter , Year ,SaleDepartment ,  Organization ,  Channel ,ProductGroupName,Team_Name
        union all
        select null Month,null Quarter , Year ,SaleDepartment ,  Organization OrganizationName,  Channel SalesChannelName,ProductGroupName GroupName, sum(Total) Revenue_Salein_Perform , sum(Quantity) Quantity_Salein_Perform , "Nam" Type,Team_Name
        from salein_dt_sp_mt group by  Year ,SaleDepartment ,  Organization ,  Channel ,ProductGroupName,Team_Name
                """).withColumn("Revenue_Salein_Plan", col("Revenue_Salein_Perform") * 1.3)
        .withColumn("Quantity_Salein_Plan", (col("Quantity_Salein_Perform") * 1.3).cast("int"))

        // thực hiện tháng, quý, năm
        val output_thang = output.filter(col("Type") === "Thang")
        val output_quy = output.filter(col("Type") === "Quy")
        val output_nam = output.filter(col("Type") === "Nam")

        // join thuc hien thang voi ke hoach thang
        val result_thang = output_thang.as("t").join(kh_thang.as("t1"), output_thang("Month") === kh_thang("Month") and output_thang("Year") === kh_thang("Year") and output_thang("SaleDepartment") === kh_thang("Don_vi")
          and output_thang("OrganizationName") === kh_thang("Vung_CN") and output_thang("SalesChannelName") === kh_thang("Kenh_ban") and output_thang("GroupName") === kh_thang("Name_SPMT")
          and output_thang("Team_Name") === kh_thang("Ten_Doi"), "fullouter")
          .select(
                coalesce(col("t.Month"), col("t1.Month")).as("Month"),
                coalesce(col("t.Quarter"), col("t1.Quarter")).as("Quarter"),
                coalesce(col("t.Year"), col("t1.Year")).as("Year"),
                coalesce(col("t.SaleDepartment"), col("t1.Don_vi")).as("SaleDepartment"),
                coalesce(col("t.OrganizationName"), col("t1.Vung_CN")).as("OrganizationName"),
                coalesce(col("t.SalesChannelName"), col("t1.Kenh_ban")).as("SalesChannelName"),
                coalesce(col("t.GroupName"), col("t1.Ten_Doi")).as("GroupName"),
                coalesce(col("t.Team_Name"), col("t1.Ten_Doi")).as("Team_Name"),
                col("t.Revenue_Salein_Perform"),
                col("t.Quantity_Salein_Perform"),
                col("t1.Doanh_thu").as("Revenue_Salein_Plan"),
                col("t1.So_Luong").as("Quantity_Salein_Plan"),
                lit("Thang").as("Type")
          )
        // join thuc hien quy voi ke hoach quy
        val result_quy = output_quy.as("t").join(kh_quy.as("t1"), output_quy("Quarter") === kh_thang("Quarter") and output_quy("Year") === kh_quy("Year") and output_quy("SaleDepartment") === kh_quy("Don_vi")
          and output_quy("OrganizationName") === kh_quy("Vung_CN") and output_quy("SalesChannelName") === kh_quy("Kenh_ban") and output_quy("GroupName") === kh_quy("Name_SPMT")
          and output_quy("Team_Name") === kh_quy("Ten_Doi"), "fullouter")
          .select(
                lit(null).as("Month"),
                coalesce(col("t.Quarter"), col("t1.Quarter")).as("Quarter"),
                coalesce(col("t.Year"), col("t1.Year")).as("Year"),
                coalesce(col("t.SaleDepartment"), col("t1.Don_vi")).as("SaleDepartment"),
                coalesce(col("t.OrganizationName"), col("t1.Vung_CN")).as("OrganizationName"),
                coalesce(col("t.SalesChannelName"), col("t1.Kenh_ban")).as("SalesChannelName"),
                coalesce(col("t.GroupName"), col("t1.Ten_Doi")).as("GroupName"),
                coalesce(col("t.Team_Name"), col("t1.Ten_Doi")).as("Team_Name"),
                col("t.Revenue_Salein_Perform"),
                col("t.Quantity_Salein_Perform"),
                col("t1.Doanh_thu").as("Revenue_Salein_Plan"),
                col("t1.So_Luong").as("Quantity_Salein_Plan"),
                lit("Quy").as("Type")
          )
        // join thuc hien nam voi ke hoach nam
        val result_nam = output_nam.as("t").join(kh_nam.as("t1"), output_nam("Year") === kh_nam("Year") and output_nam("SaleDepartment") === kh_nam("Don_vi")
          and output_nam("OrganizationName") === kh_nam("Vung_CN") and output_nam("SalesChannelName") === kh_nam("Kenh_ban") and output_nam("GroupName") === kh_nam("Name_SPMT")
          and output_nam("Team_Name") === kh_nam("Ten_Doi"), "fullouter")
          .select(
                lit(null).as("Month"),
                lit(null).as("Quarter"),
                coalesce(col("t.Year"), col("t1.Year")).as("Year"),
                coalesce(col("t.SaleDepartment"), col("t1.Don_vi")).as("SaleDepartment"),
                coalesce(col("t.OrganizationName"), col("t1.Vung_CN")).as("OrganizationName"),
                coalesce(col("t.SalesChannelName"), col("t1.Kenh_ban")).as("SalesChannelName"),
                coalesce(col("t.GroupName"), col("t1.Ten_Doi")).as("GroupName"),
                coalesce(col("t.Team_Name"), col("t1.Ten_Doi")).as("Team_Name"),
                col("t.Revenue_Salein_Perform"),
                col("t.Quantity_Salein_Perform"),
                col("t1.Doanh_thu").as("Revenue_Salein_Plan"),
                col("t1.So_Luong").as("Quantity_Salein_Plan"),
                lit("Nam").as("Type")
          )
        // union thang, quy, nam
        val result = result_thang.unionByName(result_quy).unionByName(result_nam)

    val ky_truoc_salein_group_mt = result
      .withColumn("Year", when(col("Type") === "Nam", (col("Year") + 1).cast("int"))
        .when((col("Type") === "Thang" && col("Month") === 12) || (col("Type") === "Quy" && col("Quarter") === 4), (col("Year") + 1).cast("int"))
        .otherwise(col("Year").cast("int")))
      .withColumn("Quarter", when(col("Quarter").isNull, col("Quarter").cast("int"))
        .when((col("Type") === "Quy" && col("Quarter") === 4) || (col("Type") === "Thang" && col("Month") === 12), 1)
        .when(col("Type") === "Quy" && col("Quarter") < 4, (col("Quarter") + 1).cast("int"))
        .when(col("Type") === "Thang" && (col("Month") === 3 || col("Month") === 6 || col("Month") === 9), (col("Quarter") + 1).cast("int"))
        .when(col("Type") === "Thang" && col("Month") === 12, 1).otherwise(col("Quarter").cast("int"))).withColumn("Month", when(col("Month").isNull, col("Month").cast("int"))
      .when(col("Month") < 12, (col("Month") + 1).cast("int"))
      .otherwise(1))

    val cung_ky_salein_group_mt = result.withColumn("Year", (col("Year") + 1).cast("int"))

    val res_salein_group_mt_cung_ky = result.as("result").
      join(cung_ky_salein_group_mt.as("cung_ky_salein_group_mt"),
        coalesce(col("result.Month"), lit("")) === coalesce(col("cung_ky_salein_group_mt.Month"), lit(""))
          && coalesce(col("result.Quarter"), lit("")) === coalesce(col("cung_ky_salein_group_mt.Quarter"), lit(""))
          && col("result.Year") === col("cung_ky_salein_group_mt.Year")
          && col("result.SaleDepartment") === col("cung_ky_salein_group_mt.SaleDepartment")
          && col("result.SalesChannelName") === col("cung_ky_salein_group_mt.SalesChannelName")
          && col("result.GroupName") === col("cung_ky_salein_group_mt.GroupName")
          && col("result.OrganizationName") === col("cung_ky_salein_group_mt.OrganizationName")
          && col("result.Team_Name") === col("cung_ky_salein_group_mt.Team_Name")
          && col("result.Type") === col("cung_ky_salein_group_mt.Type"), "fullouter")
      .select(coalesce(col("result.Month"), col("cung_ky_salein_group_mt.Month")).alias("Month"),
        coalesce(col("result.Quarter"), col("cung_ky_salein_group_mt.Quarter")).alias("Quarter"),
        coalesce(col("result.Year"), col("cung_ky_salein_group_mt.Year")).alias("Year"),
        coalesce(col("result.SaleDepartment"), col("cung_ky_salein_group_mt.SaleDepartment")).alias("SaleDepartment"),
        coalesce(col("result.SalesChannelName"), col("cung_ky_salein_group_mt.SalesChannelName")).alias("SalesChannelName"),
        coalesce(col("result.GroupName"), col("cung_ky_salein_group_mt.GroupName")).alias("GroupName"),
        coalesce(col("result.OrganizationName"), col("cung_ky_salein_group_mt.OrganizationName")).alias("OrganizationName"),
        col("result.Revenue_Salein_Perform").alias("Revenue_Salein_Perform"),
        col("result.Revenue_Salein_Plan").alias("Revenue_Salein_Plan"),
        col("result.Quantity_Salein_Perform").alias("Quantity_Salein_Perform"),
        col("result.Quantity_Salein_Plan").alias("Quantity_Salein_Plan"),
        coalesce(col("result.Type"), col("cung_ky_salein_group_mt.Type")).alias("Type"),
        col("cung_ky_salein_group_mt.Revenue_Salein_Perform").alias("Revenue_Salein_Perform_CungKy"),
        col("cung_ky_salein_group_mt.Quantity_Salein_Perform").alias("Quantity_Salein_Perform_CungKy"),
        col("result.Team_Name").alias("Team_Name"))

    val res_salein_group_mt = res_salein_group_mt_cung_ky.as("res_salein_group_mt_cung_ky").
      join(ky_truoc_salein_group_mt.as("ky_truoc_salein_group_mt"),
        coalesce(col("res_salein_group_mt_cung_ky.Month"), lit("")) === coalesce(col("ky_truoc_salein_group_mt.Month"), lit(""))
          && coalesce(col("res_salein_group_mt_cung_ky.Quarter"), lit("")) === coalesce(col("ky_truoc_salein_group_mt.Quarter"), lit(""))
          && col("res_salein_group_mt_cung_ky.Year") === col("ky_truoc_salein_group_mt.Year")
          && col("res_salein_group_mt_cung_ky.SaleDepartment") === col("ky_truoc_salein_group_mt.SaleDepartment")
          && col("res_salein_group_mt_cung_ky.SalesChannelName") === col("ky_truoc_salein_group_mt.SalesChannelName")
          && col("res_salein_group_mt_cung_ky.GroupName") === col("ky_truoc_salein_group_mt.GroupName")
          && col("res_salein_group_mt_cung_ky.OrganizationName") === col("ky_truoc_salein_group_mt.OrganizationName")
          && col("res_salein_group_mt_cung_ky.Team_Name") === col("ky_truoc_salein_group_mt.Team_Name")
          && col("res_salein_group_mt_cung_ky.Type") === col("ky_truoc_salein_group_mt.Type"), "fullouter")
      .select(coalesce(col("res_salein_group_mt_cung_ky.Month"), col("ky_truoc_salein_group_mt.Month")).alias("Month"),
        coalesce(col("res_salein_group_mt_cung_ky.Quarter"), col("ky_truoc_salein_group_mt.Quarter")).alias("Quarter"),
        coalesce(col("res_salein_group_mt_cung_ky.Year"), col("ky_truoc_salein_group_mt.Year")).alias("Year"),
        coalesce(col("res_salein_group_mt_cung_ky.SaleDepartment"), col("ky_truoc_salein_group_mt.SaleDepartment")).alias("SaleDepartment"),
        coalesce(col("res_salein_group_mt_cung_ky.SalesChannelName"), col("ky_truoc_salein_group_mt.SalesChannelName")).alias("SalesChannelName"),
        coalesce(col("res_salein_group_mt_cung_ky.GroupName"), col("ky_truoc_salein_group_mt.GroupName")).alias("GroupName"),
        coalesce(col("res_salein_group_mt_cung_ky.OrganizationName"), col("ky_truoc_salein_group_mt.OrganizationName")).alias("OrganizationName"),
        col("res_salein_group_mt_cung_ky.Team_Name").alias("Team_Name"),
        col("res_salein_group_mt_cung_ky.Revenue_Salein_Perform").alias("Revenue_Salein_Perform"),
        col("res_salein_group_mt_cung_ky.Revenue_Salein_Plan").alias("Revenue_Salein_Plan"),
        col("res_salein_group_mt_cung_ky.Quantity_Salein_Perform").alias("Quantity_Salein_Perform"),
        col("res_salein_group_mt_cung_ky.Revenue_Salein_Plan").alias("Quantity_Salein_Plan"),
        col("res_salein_group_mt_cung_ky.Revenue_Salein_Perform_CungKy").alias("Revenue_Salein_Perform_CungKy"),
        col("res_salein_group_mt_cung_ky.Quantity_Salein_Perform_CungKy").alias("Quantity_Salein_Perform_CungKy"),
        coalesce(col("res_salein_group_mt_cung_ky.Type"), col("ky_truoc_salein_group_mt.Type")).alias("Type"),
        col("ky_truoc_salein_group_mt.Revenue_Salein_Perform").alias("Revenue_Salein_Perform_KyTruoc"),
        col("ky_truoc_salein_group_mt.Quantity_Salein_Perform").alias("Quantity_Salein_Perform_KyTruoc"))
    val res = res_salein_group_mt
      .filter(col("Year") === "2024")
      .select(
        lit("Doanh thu (SaleIn)").alias("Ten_chi_tieu"),
        lit(null).alias("Thoi_gian_ke_hoach"),
        col("SaleDepartment").alias("Don_vi"),
        col("OrganizationName").alias("Vung_CN"),
        col("GroupName").alias("Name_SPMT"),
        col("Team_Name").alias("Ten_doi"),
        lit(null).alias("So_luong"),
        lit(null).alias("Doanh_thu"),
        lit(null).alias("Loai_KH"),

      )

    val re = res.distinct()
    val reCast = re
      .withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType))
      .withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType))
      .withColumn("Don_vi", col("Don_vi").cast(StringType))
      .withColumn("Vung_CN", col("Vung_CN").cast(StringType))
      .withColumn("Name_SPMT", col("Name_SPMT").cast(StringType))
      .withColumn("Ten_doi", col("Ten_doi").cast(StringType))
      .withColumn("So_luong", col("So_luong").cast(IntegerType))
      .withColumn("Doanh_thu", col("Doanh_thu").cast(DoubleType))
      .withColumn("Loai_KH", col("Loai_KH").cast(StringType))
    res_salein_group_mt.write.format("avro").mode("overwrite").save(AppConfig.outputDir)

  }
}

Editor is loading...
Leave a Comment