Untitled

 avatar
unknown
plain_text
9 months ago
18 kB
3
Indexable
 val product_class_detail_nsp1 = spark.read.table("WHS.SalePlan_ProductClassDetail")
    val product_class_detail_nsp2 = spark.read.table("WHS.SalePlan_ProductClassDetail")
    val product_class_detail_nsp3 = spark.read.table("WHS.SalePlan_ProductClassDetail")
    // read format ke hoach
    spark.read.table("WHS.SalePlan_ProductClassDetail").createOrReplaceTempView("productdetail")
    val df_kh_tx =  spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/salein_tx_thutien_nsp.csv")
      .filter(col("Ten_chi_tieu") === "Thực xuất")
      .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
      .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
      .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
      .withColumn("ThanhTien", col("Doanh_thu").cast("double"))
    // join ke hoach with product class to get product class ID
    val kh_tx = df_kh_tx.as("t").join(product_class_detail_nsp1.as("t1"), df_kh_tx("Name_NSP1") === product_class_detail_nsp1("Class"), "left")
      .join(product_class_detail_nsp2.as("t2"), df_kh_tx("Name_NSP2") === product_class_detail_nsp2("Class"), "left")
      .join(product_class_detail_nsp3.as("t3"), df_kh_tx("Name_NSP3") === product_class_detail_nsp3("Class"), "left")
      .select(
        col("t.*"),
        col("t1.ID").as("SP1"),
        col("t2.ID").as("SP2"),
        col("t3.ID").as("SP3")
      )
    // etl tx thang, quy, nam
    spark.read.format("avro").load("/gold_zone/full_load/kpi/tx").createOrReplaceTempView("tx")
    kh_tx.createOrReplaceTempView("kh_tx")
    val output_thang = spark.sql(
      """
        select
          tx.Month , tx.Quarter,  tx.Year , tx.Phong SalesDepartment,  tx.Chi_nhanh OrganizationName,tx.Tinh ProvinceName , tx.Kenh SalesChannelName,
          tx.N_SP1 ,  tx.N_SP2 , tx.N_SP3,p1.class Name_NSP1,p2.class Name_NSP2,p3.class Name_NSP3, tx.Team_Name ,tx.ThanhTien ActualExport_Perform , "Thang" Loai
          from tx
          join productdetail p1 on tx.N_SP1 = p1.ID
          join productdetail p2 on tx.N_SP2 = p2.ID
          join productdetail p3 on tx.N_SP3 = p3.ID
        """).createOrReplaceTempView("output_thang")
    val output_quy = spark.sql(
      """
SELECT
      NULL AS Month,
      tx.Quarter,
      tx.Year,
      tx.Phong AS SalesDepartment,
      tx.Chi_nhanh AS OrganizationName,
      tx.Tinh AS ProvinceName,
      tx.Kenh AS SalesChannelName,
      tx.N_SP1,
      tx.N_SP2,
      tx.N_SP3,
      p1.Class AS Name_NSP1,
      p2.Class AS Name_NSP2,
      p3.Class AS Name_NSP3,
      tx.Team_Name,
      SUM(tx.ThanhTien) AS ActualExport_Perform,
      'Quy' AS Loai
  FROM tx
  JOIN productdetail p1 ON tx.N_SP1 = p1.ID
  JOIN productdetail p2 ON tx.N_SP2 = p2.ID
  JOIN productdetail p3 ON tx.N_SP3 = p3.ID
  GROUP BY
      tx.Quarter,
      tx.Year,
      tx.Phong,
      tx.Chi_nhanh,
      tx.Tinh,
      tx.Kenh,
      tx.N_SP1,
      tx.N_SP2,
      tx.N_SP3,
      p1.Class,
      p2.Class,
      p3.Class,
      tx.Team_Name
    """).createOrReplaceTempView("output_quy")
    val output_nam = spark.sql(
      """
       SELECT
      NULL AS Month,
      NULL AS Quarter,
      tx.Year,
      tx.Phong AS SalesDepartment,
      tx.Chi_nhanh AS OrganizationName,
      tx.Tinh AS ProvinceName,
      tx.Kenh AS SalesChannelName,
      tx.N_SP1,
      tx.N_SP2,
      tx.N_SP3,
      p1.Class AS Name_NSP1,
      p2.Class AS Name_NSP2,
      p3.Class AS Name_NSP3,
      tx.Team_Name,
      SUM(tx.ThanhTien) AS ActualExport_Perform,
      'Nam' AS Loai
  FROM tx
  JOIN productdetail p1 ON tx.N_SP1 = p1.ID
  JOIN productdetail p2 ON tx.N_SP2 = p2.ID
  JOIN productdetail p3 ON tx.N_SP3 = p3.ID
  GROUP BY
      tx.Year,
      tx.Phong,
      tx.Chi_nhanh,
      tx.Tinh,
      tx.Kenh,
      tx.N_SP1,
      tx.N_SP2,
      tx.N_SP3,
      p1.Class,
      p2.Class,
      p3.Class,
      tx.Team_Name
    """).createOrReplaceTempView("output_nam")

//
//    spark.read.format("avro").load("/gold_zone/full_load/kpi/tx").createOrReplaceTempView("tx")
//    val output = spark.sql(
//        """
//        select
//          Month , Quarter,  Year , Phong SalesDepartment,  Chi_nhanh OrganizationName,Tinh ProvinceName , Kenh SalesChannelName,
//          N_SP1 ,  N_SP2 , N_SP3, Team_Name ,ThanhTien ActualExport_Perform , "Thang" Loai
//          from tx
//        union all
//        select
//          null Month ,Quarter , Year , Phong SalesDepartment,  Chi_nhanh OrganizationName,Tinh ProvinceName , Kenh SalesChannelName,
//          N_SP1 ,  N_SP2 , N_SP3, Team_Name ,sum(ThanhTien) ActualExport_Perform , "Quy" Loai
//          from tx
//          group by Quarter ,   Year , Phong ,  Chi_nhanh , Kenh , N_SP1 ,  N_SP2 , N_SP3,Tinh, Team_Name
//         union all
//        select
//          null Month ,null Quarter ,   Year , Phong SalesDepartment,  Chi_nhanh OrganizationName,Tinh ProvinceName , Kenh SalesChannelName  ,
//          N_SP1 ,  N_SP2 , N_SP3, Team_Name ,sum(ThanhTien) ActualExport_Perform , "Nam" Loai
//          from tx
//          group by   Year , Phong ,  Chi_nhanh  , Kenh , N_SP1 ,  N_SP2 , N_SP3,Tinh, Team_Name
//        """).withColumn("ActualExport_Plan", col("ActualExport_Perform") * 1.3)
//      .withColumn("SalesChannelName", col("SalesChannelName").cast("string"))
    // join ke hoach thang, quy, nam with tx thang, quy, nam
    val output = spark.sql(
      """
        select output_thang.Month,
        output_thang.Quarter,
        output_thang.Year,
        output_thang.SalesDepartment,
        output_thang.OrganizationName,
        output_thang.ProvinceName,
        output_thang.SalesChannelName,
        output_thang.Name_NSP1,
        output_thang.Name_NSP2,
        output_thang.Name_NSP3,
        output_thang.N_SP1,
        output_thang.N_SP2,
        output_thang.N_SP3,
        output_thang.Team_Name,
        output_thang.ActualExport_Perform,
        output_thang.Loai,
        kh_tx.ThanhTien AS ActualExport_Plan
        from output_thang
        left join
        kh_tx
        on output_thang.Month = kh_tx.Month and output_thang.Quarter = kh_tx.Quarter and output_thang.Year = kh_tx.Year and output_thang.SalesDepartment = kh_tx.Don_vi and output_thang.OrganizationName = kh_tx.Vung_CN
        and output_thang.ProvinceName = kh_tx.Tinh_Thanh_Pho and output_thang.SalesChannelName = kh_tx.Kenh_ban and output_thang.N_SP1 = kh_tx.SP1 and output_thang.N_SP2 = kh_tx.SP2
        and output_thang.N_SP3 = kh_tx.SP3 and output_thang.Name_NSP1 = kh_tx.Name_NSP1 and output_thang.Name_NSP2 = kh_tx.Name_NSP2 and output_thang.Name_NSP3 = kh_tx.Name_NSP3 and output_thang.Team_Name = kh_tx.Ten_doi and output_thang.Loai = kh_tx.Loai_KH

        union all
        select
        null Month,
        output_quy.Quarter,
        output_quy.Year,
        output_quy.SalesDepartment,
        output_quy.OrganizationName,
        output_quy.ProvinceName,
        output_quy.SalesChannelName,
         output_quy.Name_NSP1,
        output_quy.Name_NSP2,
        output_quy.Name_NSP3,
        output_quy.N_SP1,
        output_quy.N_SP2,
        output_quy.N_SP3,
        output_quy.Team_Name,
        output_quy.ActualExport_Perform,
        output_quy.Loai,
        kh_tx.ThanhTien AS ActualExport_Plan
        from output_quy
        left join
        kh_tx
        on output_quy.Quarter = kh_tx.Quarter and output_quy.Year = kh_tx.Year and output_quy.SalesDepartment = kh_tx.Don_vi and output_quy.OrganizationName = kh_tx.Vung_CN
        and output_quy.ProvinceName = kh_tx.Tinh_Thanh_Pho and output_quy.SalesChannelName = kh_tx.Kenh_ban and output_quy.N_SP1 = kh_tx.SP1 and output_quy.N_SP2 = kh_tx.SP2
        and output_quy.N_SP3 = kh_tx.SP3 and output_quy.Name_NSP1 = kh_tx.Name_NSP1 and output_quy.Name_NSP2 = kh_tx.Name_NSP2 and output_quy.Name_NSP3 = kh_tx.Name_NSP3 and output_quy.Team_Name = kh_tx.Ten_doi and output_quy.Loai = kh_tx.Loai_KH

        union all
        select
        null Month,
        null Quarter,
        output_nam.Year,
        output_nam.SalesDepartment,
        output_nam.OrganizationName,
        output_nam.ProvinceName,
        output_nam.SalesChannelName,
         output_nam.Name_NSP1,
        output_nam.Name_NSP2,
        output_nam.Name_NSP3,
        output_nam.N_SP1,
        output_nam.N_SP2,
        output_nam.N_SP3,
        output_nam.Team_Name,
        output_nam.ActualExport_Perform,
        output_nam.Loai,
        kh_tx.ThanhTien AS ActualExport_Plan
        from output_nam
        left join
        kh_tx
        on output_nam.Year = kh_tx.Year and output_nam.SalesDepartment = kh_tx.Don_vi and output_nam.OrganizationName = kh_tx.Vung_CN
        and output_nam.ProvinceName = kh_tx.Tinh_Thanh_Pho and output_nam.SalesChannelName = kh_tx.Kenh_ban and output_nam.N_SP1 = kh_tx.SP1 and output_nam.N_SP2 = kh_tx.SP2
        and output_nam.N_SP3 = kh_tx.SP3 and output_nam.Name_NSP1 = kh_tx.Name_NSP1 and output_nam.Name_NSP2 = kh_tx.Name_NSP2 and output_nam.Name_NSP3 = kh_tx.Name_NSP3 and output_nam.Team_Name = kh_tx.Ten_doi and output_nam.Loai = kh_tx.Loai_KH


    """).withColumn("SalesChannelName", col("SalesChannelName").cast("string"))


    val ky_truoc_kpi_thuc_xuat = output
      .withColumn("Year", when(col("Loai") === "Nam", (col("Year") + 1).cast("int"))
        .when((col("Loai") === "Thang" && col("Month") === 12) || (col("Loai") === "Quy" && col("Quarter") === 4), (col("Year") + 1).cast("int"))
        .otherwise(col("Year").cast("int")))
      .withColumn("Quarter", when(col("Quarter").isNull, col("Quarter").cast("int"))
        .when((col("Loai") === "Quy" && col("Quarter") === 4) || (col("Loai") === "Thang" && col("Month") === 12), 1)
        .when(col("Loai") === "Quy" && col("Quarter") < 4, (col("Quarter") + 1).cast("int"))
        .when(col("Loai") === "Thang" && (col("Month") === 3 || col("Month") === 6 || col("Month") === 9), (col("Quarter") + 1).cast("int"))
        .when(col("Loai") === "Thang" && col("Month") === 12, 1).otherwise(col("Quarter").cast("int")))
      .withColumn("Month", when(col("Month").isNull, col("Month").cast("int"))
        .when(col("Month") < 12, (col("Month") + 1).cast("int"))
        .otherwise(1))

    val cung_ky_kpi_thucxuat = output.withColumn("Year", (col("Year") + 1).cast("int"))

    val res_kpi_thucxuat_cung_ky = output.as("output").
      join(cung_ky_kpi_thucxuat.as("cung_ky_kpi_thucxuat"),
        col("output.Month") === col("cung_ky_kpi_thucxuat.Month")
          && col("output.Quarter") === col("cung_ky_kpi_thucxuat.Quarter")
          && col("output.Year") === col("cung_ky_kpi_thucxuat.Year")
          && col("output.SalesDepartment") === col("cung_ky_kpi_thucxuat.SalesDepartment")
          && col("output.OrganizationName") === col("cung_ky_kpi_thucxuat.OrganizationName")
          && col("output.ProvinceName") === col("cung_ky_kpi_thucxuat.ProvinceName")
          && col("output.SalesChannelName") === col("cung_ky_kpi_thucxuat.SalesChannelName")
          && col("output.Name_NSP1") === col("cung_ky_kpi_thucxuat.Name_NSP1")
          && col("output.Name_NSP2") === col("cung_ky_kpi_thucxuat.Name_NSP2")
          && col("output.Name_NSP3") === col("cung_ky_kpi_thucxuat.Name_NSP3")
          && col("output.N_SP1") === col("cung_ky_kpi_thucxuat.N_SP1")
          && col("output.N_SP2") === col("cung_ky_kpi_thucxuat.N_SP2")
          && col("output.N_SP3") === col("cung_ky_kpi_thucxuat.N_SP3")
          && col("output.Team_Name") === col("cung_ky_kpi_thucxuat.Team_Name")
          && col("output.Loai") === col("cung_ky_kpi_thucxuat.Loai"),
        "fullouter"
      )
      .select(
        coalesce(col("output.Month"), col("cung_ky_kpi_thucxuat.Month")).alias("Month"),
        coalesce(col("output.Quarter"), col("cung_ky_kpi_thucxuat.Quarter")).alias("Quarter"),
        coalesce(col("output.Year"), col("cung_ky_kpi_thucxuat.Year")).alias("Year"),
        coalesce(col("output.SalesDepartment"), col("cung_ky_kpi_thucxuat.SalesDepartment")).alias("SalesDepartment"),
        coalesce(col("output.OrganizationName"), col("cung_ky_kpi_thucxuat.OrganizationName")).alias("OrganizationName"),
        coalesce(col("output.ProvinceName"), col("cung_ky_kpi_thucxuat.ProvinceName")).alias("ProvinceName"),
        coalesce(col("output.SalesChannelName"), col("cung_ky_kpi_thucxuat.SalesChannelName")).alias("SalesChannelName"),
        coalesce(col("output.N_SP1"), col("cung_ky_kpi_thucxuat.N_SP1")).alias("N_SP1"),
        coalesce(col("output.N_SP2"), col("cung_ky_kpi_thucxuat.N_SP2")).alias("N_SP2"),
        coalesce(col("output.N_SP3"), col("cung_ky_kpi_thucxuat.N_SP3")).alias("N_SP3"),
        coalesce(col("output.Name_NSP1"), col("cung_ky_kpi_thucxuat.Name_NSP1")).alias("Name_NSP1"),
        coalesce(col("output.Name_NSP2"), col("cung_ky_kpi_thucxuat.Name_NSP2")).alias("Name_NSP2"),
        coalesce(col("output.Name_NSP3"), col("cung_ky_kpi_thucxuat.Name_NSP3")).alias("Name_NSP3"),
        coalesce(col("output.Team_Name"), col("cung_ky_kpi_thucxuat.Team_Name")).alias("Team_Name"),
        col("output.ActualExport_Perform").alias("ActualExport_Perform"),
        col("output.ActualExport_Plan").alias("ActualExport_Plan"),
        coalesce(col("output.Loai"), col("cung_ky_kpi_thucxuat.Loai")).alias("Loai"),
        col("cung_ky_kpi_thucxuat.ActualExport_Perform").alias("ActualExport_Perform_CungKy")
      )

    val res_kpi_thucxuat = res_kpi_thucxuat_cung_ky.as("res_kpi_thucxuat_cung_ky")
      .join(ky_truoc_kpi_thuc_xuat.as("ky_truoc_kpi_thuc_xuat"),
        col("res_kpi_thucxuat_cung_ky.Month") === col("ky_truoc_kpi_thuc_xuat.Month")
          && col("res_kpi_thucxuat_cung_ky.Quarter") === col("ky_truoc_kpi_thuc_xuat.Quarter")
          && col("res_kpi_thucxuat_cung_ky.Year") === col("ky_truoc_kpi_thuc_xuat.Year")
          && col("res_kpi_thucxuat_cung_ky.SalesDepartment") === col("ky_truoc_kpi_thuc_xuat.SalesDepartment")
          && col("res_kpi_thucxuat_cung_ky.OrganizationName") === col("ky_truoc_kpi_thuc_xuat.OrganizationName")
          && col("res_kpi_thucxuat_cung_ky.ProvinceName") === col("ky_truoc_kpi_thuc_xuat.ProvinceName")
          && col("res_kpi_thucxuat_cung_ky.SalesChannelName") === col("ky_truoc_kpi_thuc_xuat.SalesChannelName")
          && col("res_kpi_thucxuat_cung_ky.Name_NSP1") === col("ky_truoc_kpi_thuc_xuat.Name_NSP1")
          && col("res_kpi_thucxuat_cung_ky.Name_NSP2") === col("ky_truoc_kpi_thuc_xuat.Name_NSP2")
          && col("res_kpi_thucxuat_cung_ky.Name_NSP3") === col("ky_truoc_kpi_thuc_xuat.Name_NSP3")
          && col("res_kpi_thucxuat_cung_ky.N_SP1") === col("ky_truoc_kpi_thuc_xuat.N_SP1")
          && col("res_kpi_thucxuat_cung_ky.N_SP2") === col("ky_truoc_kpi_thuc_xuat.N_SP2")
          && col("res_kpi_thucxuat_cung_ky.N_SP3") === col("ky_truoc_kpi_thuc_xuat.N_SP3")
          && col("res_kpi_thucxuat_cung_ky.Team_Name") === col("ky_truoc_kpi_thuc_xuat.Team_Name")
          && col("res_kpi_thucxuat_cung_ky.Loai") === col("ky_truoc_kpi_thuc_xuat.Loai"),
        "fullouter"
      )
      .select(
        coalesce(col("res_kpi_thucxuat_cung_ky.Month"),col("ky_truoc_kpi_thuc_xuat.Month")).alias("Month"),
        coalesce(col("res_kpi_thucxuat_cung_ky.Quarter"),col("ky_truoc_kpi_thuc_xuat.Quarter")).alias("Quarter"),
        coalesce(col("res_kpi_thucxuat_cung_ky.Year"),col("ky_truoc_kpi_thuc_xuat.Year")).alias("Year"),
        coalesce(col("res_kpi_thucxuat_cung_ky.SalesDepartment"),col("ky_truoc_kpi_thuc_xuat.SalesDepartment")).alias("SalesDepartment"),
        coalesce(col("res_kpi_thucxuat_cung_ky.OrganizationName"),col("ky_truoc_kpi_thuc_xuat.OrganizationName")).alias("OrganizationName"),
        coalesce(col("res_kpi_thucxuat_cung_ky.ProvinceName"),col("ky_truoc_kpi_thuc_xuat.ProvinceName")).alias("ProvinceName"),
        coalesce(col("res_kpi_thucxuat_cung_ky.SalesChannelName"),col("ky_truoc_kpi_thuc_xuat.SalesChannelName")).alias("SalesChannelName"),
        coalesce(col("res_kpi_thucxuat_cung_ky.N_SP1"),col("ky_truoc_kpi_thuc_xuat.N_SP1")).alias("N_SP1"),
        coalesce(col("res_kpi_thucxuat_cung_ky.N_SP2"),col("ky_truoc_kpi_thuc_xuat.N_SP2")).alias("N_SP2"),
        coalesce(col("res_kpi_thucxuat_cung_ky.N_SP3"),col("ky_truoc_kpi_thuc_xuat.N_SP3")).alias("N_SP3"),
        coalesce(col("res_kpi_thucxuat_cung_ky.Name_NSP1"),col("ky_truoc_kpi_thuc_xuat.Name_NSP1")).alias("Name_NSP1"),
        coalesce(col("res_kpi_thucxuat_cung_ky.Name_NSP2"),col("ky_truoc_kpi_thuc_xuat.Name_NSP2")).alias("Name_NSP2"),
        coalesce(col("res_kpi_thucxuat_cung_ky.Name_NSP3"),col("ky_truoc_kpi_thuc_xuat.Name_NSP3")).alias("Name_NSP3"),
        coalesce(col("res_kpi_thucxuat_cung_ky.Team_Name"),col("ky_truoc_kpi_thuc_xuat.Team_Name")).alias("Team_Name"),
        col("res_kpi_thucxuat_cung_ky.ActualExport_Perform").alias("ActualExport_Perform"),
        col("res_kpi_thucxuat_cung_ky.ActualExport_Plan").alias("ActualExport_Plan"),
        coalesce(col("res_kpi_thucxuat_cung_ky.Loai"),col("ky_truoc_kpi_thuc_xuat.Loai")).alias("Loai"),
        col("res_kpi_thucxuat_cung_ky.ActualExport_Perform_CungKy").alias("ActualExport_Perform_CungKy"),
        col("ky_truoc_kpi_thuc_xuat.ActualExport_Perform").alias("ActualExport_Perform_KyTruoc")
      ).withColumn("Ma_Nhom", concat(lit("L"), substring(col("N_SP1"), -1, 1)))

    res_kpi_thucxuat.createOrReplaceTempView("kpi_thucxuat")
    val res = spark.sql(
      """
      select N_SP1, sum(ActualExport_Perform_KyTruoc)
      from kpi_thucxuat where Year = 2024 and Month between 1 and 5
      group by N_SP1
    """)
Editor is loading...
Leave a Comment