Untitled

 avatar
unknown
plain_text
a year ago
17 kB
4
Indexable
  spark.read.format("avro").load("/gold_zone/full_load/kpi/salein_dt_sp_km")
      .withColumnRenamed("Organization", "OrganizationName")
      .withColumnRenamed("Channel", "SalesChannelName")
      .withColumnRenamed("Total", "Revenue_Salein_Perform")
      .withColumnRenamed("Quantity", "Quantity_Salein_Perform")
      .groupBy("Month", "Quarter", "Year", "SaleDepartment", "OrganizationName", "SalesChannelName", "SPKM_BH1", "SPKM_BH2", "Team_Name")
      .agg(sum(col("Revenue_Salein_Perform")).as("Revenue_Salein_Perform"),
        sum(col("Quantity_Salein_Perform")).as("Quantity_Salein_Perform"))
      .withColumn("Loai", lit("Thang")).createOrReplaceTempView("salein_dt_th")

    // Doanh thu salein Kế hoạch  tháng
    spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/salein_tx_thutien_spkm.csv").filter(col("Ten_chi_tieu") === "Doanh thu (SaleIn)" and col("Loai_KH") === "Thang")
      .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
      .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
      .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
      .withColumnRenamed("Don_vi", "SalesDepartment")
      .withColumn("Doanh_thu", col("Doanh_thu").cast("double"))
      .withColumn("So_luong", col("So_luong").cast("int"))
      .groupBy("Month", "Quarter", "Year", "SalesDepartment", "Vung_CN", "Kenh_ban", "SPKM_BH1", "SPKM_BH2", "Ten_doi", "Loai_KH")
      .agg(sum(col("Doanh_thu")).as("Revenue_Salein_Plan"),
        sum(col("So_luong")).as("Quantity_Salein_Plan")
      )
      .createOrReplaceTempView("salein_dt_kh")

    // Doanh thu Salein  tháng
    val salein_thang = spark.sql(
      """
                    select coalesce(a.Month ,b.Month) as Month,
                    coalesce(a.Quarter ,b.Quarter) as Quarter,
                    coalesce(a.Year ,b.Year) as Year,
                    coalesce(a.Loai ,b.Loai_KH) as Type,
                    coalesce(a.SaleDepartment ,b.SalesDepartment) as SaleDepartment,
                    coalesce(a.OrganizationName ,b.Vung_CN) as OrganizationName,
                    coalesce(a.SalesChannelName ,b.Kenh_ban) as SalesChannelName,
                    coalesce(a.SPKM_BH1 ,b.SPKM_BH1) as SPKM_BH1,
                    coalesce(a.SPKM_BH2 ,b.SPKM_BH2) as SPKM_BH2,
                    coalesce(a.Team_Name ,b.Ten_doi) as Team_Name,
                    Revenue_Salein_Perform,Quantity_Salein_Perform, Revenue_Salein_Plan,Quantity_Salein_Plan
                    from salein_dt_th a
                    full join salein_dt_kh b on
                    a.Month = b.Month and a.Quarter = b.Quarter and a.Year = b.Year and
                    a.SaleDepartment = b.SalesDepartment and a.Loai = b.Loai_KH and
                    a.SPKM_BH1 = b.SPKM_BH1 and a.SPKM_BH2 = b.SPKM_BH2 and
                    a.OrganizationName =b.Vung_CN and a.SalesChannelName =b.Kenh_ban and
                     a.Team_Name=b.Ten_doi

            """)


    // doanh thu salein Thực hiện quý
    spark.read.format("avro").load("/gold_zone/full_load/kpi/salein_dt_sp_km")
      .withColumnRenamed("Organization", "OrganizationName")
      .withColumnRenamed("Channel", "SalesChannelName")
      .withColumnRenamed("Total", "Revenue_Salein_Perform")
      .withColumnRenamed("Quantity", "Quantity_Salein_Perform")
      .groupBy("Quarter", "Year", "SaleDepartment", "OrganizationName", "SalesChannelName", "SPKM_BH1", "SPKM_BH2", "Team_Name")
      .agg(sum(col("Revenue_Salein_Perform")).as("Revenue_Salein_Perform"),
        sum(col("Quantity_Salein_Perform")).as("Quantity_Salein_Perform"))
      .withColumn("Loai", lit("Quy")).createOrReplaceTempView("salein_dt_quy")

    // Doanh thu salein Kế hoạch  quý
    spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/salein_tx_thutien_spkm.csv").filter(col("Ten_chi_tieu") === "Doanh thu (SaleIn)" and col("Loai_KH") === "Quy")
      .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
      .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
      .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
      .withColumnRenamed("Don_vi", "SalesDepartment")
      .withColumn("Doanh_thu", col("Doanh_thu").cast("double"))
      .withColumn("So_luong", col("So_luong").cast("int"))
      .groupBy("Quarter", "Year", "SalesDepartment", "Vung_CN", "Kenh_ban", "SPKM_BH1", "SPKM_BH2", "Ten_doi", "Loai_KH")
      .agg(sum(col("Doanh_thu")).as("Revenue_Salein_Plan"),
        sum(col("So_luong")).as("Quantity_Salein_Plan")
      )
      .createOrReplaceTempView("salein_dt_kh_quy")

    // Doanh thu Salein  quý
    val salein_quy = spark.sql(
      """
                    select null as Month,
                    coalesce(a.Quarter ,b.Quarter) as Quarter,
                    coalesce(a.Year ,b.Year) as Year,
                    coalesce(a.Loai ,b.Loai_KH) as Type,
                    coalesce(a.SaleDepartment ,b.SalesDepartment) as SaleDepartment,
                    coalesce(a.OrganizationName ,b.Vung_CN) as OrganizationName,
                    coalesce(a.SalesChannelName ,b.Kenh_ban) as SalesChannelName,
                    coalesce(a.SPKM_BH1 ,b.SPKM_BH1) as SPKM_BH1,
                    coalesce(a.SPKM_BH2 ,b.SPKM_BH2) as SPKM_BH2,
                    coalesce(a.Team_Name ,b.Ten_doi) as Team_Name,
                    Revenue_Salein_Perform,Quantity_Salein_Perform, Revenue_Salein_Plan,Quantity_Salein_Plan
                    from salein_dt_quy a
                    full join salein_dt_kh_quy b on  a.Quarter = b.Quarter and a.Year = b.Year and
                    a.SaleDepartment = b.SalesDepartment and a.Loai = b.Loai_KH and
                    a.SPKM_BH1 = b.SPKM_BH1 and a.SPKM_BH2 = b.SPKM_BH2 and
                    a.OrganizationName =b.Vung_CN and a.SalesChannelName =b.Kenh_ban and
                    a.Team_Name=b.Ten_doi

            """)


    // doanh thu salein Thực hiện Nam
    spark.read.format("avro").load("/gold_zone/full_load/kpi/salein_dt_sp_km")
      .withColumnRenamed("Organization", "OrganizationName")
      .withColumnRenamed("Channel", "SalesChannelName")
      .withColumnRenamed("Total", "Revenue_Salein_Perform")
      .withColumnRenamed("Quantity", "Quantity_Salein_Perform")
      .groupBy("Year", "SaleDepartment", "OrganizationName", "SalesChannelName", "SPKM_BH1", "SPKM_BH2", "Team_Name")
      .agg(sum(col("Revenue_Salein_Perform")).as("Revenue_Salein_Perform"),
        sum(col("Quantity_Salein_Perform")).as("Quantity_Salein_Perform"))
      .withColumn("Loai", lit("Nam")).createOrReplaceTempView("salein_dt_nam")

    // Doanh thu salein Kế hoạch  Nam
    spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/salein_tx_thutien_spkm.csv").filter(col("Ten_chi_tieu") === "Doanh thu (SaleIn)" and col("Loai_KH") === "Nam")
      .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
      .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
      .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
      .withColumnRenamed("Don_vi", "SalesDepartment")
      .withColumn("Doanh_thu", col("Doanh_thu").cast("double"))
      .withColumn("So_luong", col("So_luong").cast("int"))
      .groupBy("Year", "SalesDepartment", "Vung_CN", "Kenh_ban", "SPKM_BH1", "SPKM_BH2", "Ten_doi", "Loai_KH")
      .agg(sum(col("Doanh_thu")).as("Revenue_Salein_Plan"),
        sum(col("So_luong")).as("Quantity_Salein_Plan")
      )
      .createOrReplaceTempView("salein_dt_kh_nam")

    // Doanh thu Salein  Nam
    val salein_nam = spark.sql(
      """
                    select null as Month,
                    null as Quarter,
                    coalesce(a.Year ,b.Year) as Year,
                    coalesce(a.Loai ,b.Loai_KH) as Type,
                    coalesce(a.SaleDepartment ,b.SalesDepartment) as SaleDepartment,
                    coalesce(a.OrganizationName ,b.Vung_CN) as OrganizationName,
                    coalesce(a.SalesChannelName ,b.Kenh_ban) as SalesChannelName,
                    coalesce(a.SPKM_BH1 ,b.SPKM_BH1) as SPKM_BH1,
                    coalesce(a.SPKM_BH2 ,b.SPKM_BH2) as SPKM_BH2,
                    coalesce(a.Team_Name ,b.Ten_doi) as Team_Name,
                    Revenue_Salein_Perform,Quantity_Salein_Perform, Revenue_Salein_Plan,Quantity_Salein_Plan
                    from salein_dt_nam a
                    full join salein_dt_kh_nam b on a.Year = b.Year and
                    a.SaleDepartment = b.SalesDepartment and a.Loai = b.Loai_KH and
                    a.SPKM_BH1 = b.SPKM_BH1 and a.SPKM_BH2 = b.SPKM_BH2 and
                    a.OrganizationName =b.Vung_CN and a.SalesChannelName =b.Kenh_ban and
                    a.Team_Name=b.Ten_doi

            """)
    val result = salein_thang.unionAll(salein_quy).unionAll(salein_nam)

    val ky_truoc_salein_group_km = result
      .withColumn("Year", when(col("Type") === "Nam", (col("Year") + 1).cast("int"))
        .when((col("Type") === "Thang" && col("Month") === 12) || (col("Type") === "Quy" && col("Quarter") === 4), (col("Year") + 1).cast("int"))
        .otherwise(col("Year").cast("int")))
      .withColumn("Quarter", when(col("Quarter").isNull, col("Quarter").cast("int"))
        .when((col("Type") === "Quy" && col("Quarter") === 4) || (col("Type") === "Thang" && col("Month") === 12), 1)
        .when(col("Type") === "Quy" && col("Quarter") < 4, (col("Quarter") + 1).cast("int"))
        .when(col("Type") === "Thang" && (col("Month") === 3 || col("Month") === 6 || col("Month") === 9), (col("Quarter") + 1).cast("int"))
        .when(col("Type") === "Thang" && col("Month") === 12, 1).otherwise(col("Quarter").cast("int"))).withColumn("Month", when(col("Month").isNull, col("Month").cast("int"))
      .when(col("Month") < 12, (col("Month") + 1).cast("int"))
      .otherwise(1))

    val cung_ky_salein_group_km = result.withColumn("Year", (col("Year") + 1).cast("int"))

    val res_salein_group_km_cung_ky = result.as("result").
      join(cung_ky_salein_group_km.as("cung_ky_salein_group_km"),
        col("result.Month") === col("cung_ky_salein_group_km.Month")
          && col("result.Quarter") === col("cung_ky_salein_group_km.Quarter")
          && col("result.Year") === col("cung_ky_salein_group_km.Year")
          && col("result.SaleDepartment") === col("cung_ky_salein_group_km.SaleDepartment")
          && col("result.SalesChannelName") === col("cung_ky_salein_group_km.SalesChannelName")
          && col("result.SPKM_BH1") === col("cung_ky_salein_group_km.SPKM_BH1")
          && col("result.SPKM_BH2") === col("cung_ky_salein_group_km.SPKM_BH2")
          && col("result.OrganizationName") === col("cung_ky_salein_group_km.OrganizationName")
          && col("result.Team_Name") === col("cung_ky_salein_group_km.Team_Name")
          && col("result.Type") === col("cung_ky_salein_group_km.Type"), "fullouter")
      .select(coalesce(col("result.Month"), col("cung_ky_salein_group_km.Month")).alias("Month"),
        coalesce(col("result.Quarter"), col("cung_ky_salein_group_km.Quarter")).alias("Quarter"),
        coalesce(col("result.Year"), col("cung_ky_salein_group_km.Year")).alias("Year"),
        coalesce(col("result.SaleDepartment"), col("cung_ky_salein_group_km.SaleDepartment")).alias("SaleDepartment"),
        coalesce(col("result.SalesChannelName"), col("cung_ky_salein_group_km.SalesChannelName")).alias("SalesChannelName"),
        coalesce(col("result.SPKM_BH1"), col("cung_ky_salein_group_km.SPKM_BH1")).alias("SPKM_BH1"),
        coalesce(col("result.SPKM_BH2"), col("cung_ky_salein_group_km.SPKM_BH2")).alias("SPKM_BH2"),
        coalesce(col("result.OrganizationName"), col("cung_ky_salein_group_km.OrganizationName")).alias("OrganizationName"),
        coalesce(col("result.Team_Name"), col("cung_ky_salein_group_km.Team_Name")).alias("Team_Name"),
        col("result.Revenue_Salein_Perform").alias("Revenue_Salein_Perform"),
        col("result.Revenue_Salein_Plan").alias("Revenue_Salein_Plan"),
        col("result.Quantity_Salein_Perform").alias("Quantity_Salein_Perform"),
        col("result.Quantity_Salein_Plan").alias("Quantity_Salein_Plan"),
        coalesce(col("result.Type"), col("cung_ky_salein_group_km.Type")).alias("Type"),
        col("cung_ky_salein_group_km.Revenue_Salein_Perform").alias("Revenue_Salein_Perform_CungKy"),
        col("cung_ky_salein_group_km.Quantity_Salein_Perform").alias("Quantity_Salein_Perform_CungKy"))

    val res_salein_group_km = res_salein_group_km_cung_ky.as("res_salein_group_km_cung_ky").
      join(ky_truoc_salein_group_km.as("ky_truoc_salein_group_km"),
        col("res_salein_group_km_cung_ky.Month") === col("ky_truoc_salein_group_km.Month")
          && col("res_salein_group_km_cung_ky.Quarter") === col("ky_truoc_salein_group_km.Quarter")
          && col("res_salein_group_km_cung_ky.Year") === col("ky_truoc_salein_group_km.Year")
          && col("res_salein_group_km_cung_ky.SaleDepartment") === col("ky_truoc_salein_group_km.SaleDepartment")
          && col("res_salein_group_km_cung_ky.SalesChannelName") === col("ky_truoc_salein_group_km.SalesChannelName")
          && col("res_salein_group_km_cung_ky.SPKM_BH1") === col("ky_truoc_salein_group_km.SPKM_BH1")
          && col("res_salein_group_km_cung_ky.SPKM_BH2") === col("ky_truoc_salein_group_km.SPKM_BH2")
          && col("res_salein_group_km_cung_ky.OrganizationName") === col("ky_truoc_salein_group_km.OrganizationName")
          && col("res_salein_group_km_cung_ky.Team_Name") === col("ky_truoc_salein_group_km.Team_Name")
          && col("res_salein_group_km_cung_ky.Type") === col("ky_truoc_salein_group_km.Type"), "fullouter")
      .select(coalesce(col("res_salein_group_km_cung_ky.Month"), col("ky_truoc_salein_group_km.Month")).alias("Month"),
        coalesce(col("res_salein_group_km_cung_ky.Quarter"), col("ky_truoc_salein_group_km.Quarter")).alias("Quarter"),
        coalesce(col("res_salein_group_km_cung_ky.Year"), col("ky_truoc_salein_group_km.Year")).alias("Year"),
        coalesce(col("res_salein_group_km_cung_ky.SaleDepartment"), col("ky_truoc_salein_group_km.SaleDepartment")).alias("SaleDepartment"),
        coalesce(col("res_salein_group_km_cung_ky.SalesChannelName"), col("ky_truoc_salein_group_km.SalesChannelName")).alias("SalesChannelName"),
        coalesce(col("res_salein_group_km_cung_ky.SPKM_BH1"), col("ky_truoc_salein_group_km.SPKM_BH1")).alias("SPKM_BH1"),
        coalesce(col("res_salein_group_km_cung_ky.SPKM_BH2"), col("ky_truoc_salein_group_km.SPKM_BH1")).alias("SPKM_BH2"),
        coalesce(col("res_salein_group_km_cung_ky.OrganizationName"), col("ky_truoc_salein_group_km.OrganizationName")).alias("OrganizationName"),
        coalesce(col("res_salein_group_km_cung_ky.Team_Name"), col("ky_truoc_salein_group_km.Team_Name")).alias("Team_Name"),
        col("res_salein_group_km_cung_ky.Revenue_Salein_Perform").alias("Revenue_Salein_Perform"),
        col("res_salein_group_km_cung_ky.Revenue_Salein_Plan").alias("Revenue_Salein_Plan"),
        col("res_salein_group_km_cung_ky.Quantity_Salein_Perform").alias("Quantity_Salein_Perform"),
        col("res_salein_group_km_cung_ky.Quantity_Salein_Plan").alias("Quantity_Salein_Plan"),
        col("res_salein_group_km_cung_ky.Revenue_Salein_Perform_CungKy").alias("Revenue_Salein_Perform_CungKy"),
        col("res_salein_group_km_cung_ky.Quantity_Salein_Perform_CungKy").alias("Quantity_Salein_Perform_CungKy"),
        coalesce(col("res_salein_group_km_cung_ky.Type"), col("ky_truoc_salein_group_km.Type")).alias("Type"),
        col("ky_truoc_salein_group_km.Revenue_Salein_Perform").alias("Revenue_Salein_Perform_KyTruoc"),
        col("ky_truoc_salein_group_km.Quantity_Salein_Perform").alias("Quantity_Salein_Perform_KyTruoc"))
    val res = res_salein_group_km
      .filter(col("Year") === "2024")
      .select(
        lit("Doanh thu (SaleIn)").alias("Ten_chi_tieu"),
        lit(null).alias("Thoi_gian_ke_hoach"),
        col("SaleDepartment").alias("Don_vi"),
        col("OrganizationName").alias("Vung_CN"),
        col("SPKM_BH1").alias("SPKM_BH1"),
        col("SPKM_BH2").alias("SPKM_BH2"),
        col("Team_Name").alias("Ten_doi"),
        lit(null).alias("So_luong"),
        lit(null).alias("Doanh_thu"),
        lit(null).alias("Loai_KH"),

      )

    val re = res.distinct()
    val reCast = re
      .withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType))
      .withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType))
      .withColumn("Don_vi", col("Don_vi").cast(StringType))
      .withColumn("Vung_CN", col("Vung_CN").cast(StringType))
      .withColumn("SPKM_BH1", col("SPKM_BH1").cast(StringType))
      .withColumn("SPKM_BH2", col("SPKM_BH2").cast(StringType))
      .withColumn("Ten_doi", col("Ten_doi").cast(StringType))
      .withColumn("So_luong", col("So_luong").cast(IntegerType))
      .withColumn("Doanh_thu", col("Doanh_thu").cast(DoubleType))
      .withColumn("Loai_KH", col("Loai_KH").cast(StringType))
Editor is loading...
Leave a Comment