Untitled
unknown
plain_text
a year ago
5.6 kB
5
Indexable
package vn.com.viettel.code.plan import vn.com.viettel.utils.SparkUtils.createBatchSession import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._ import vn.com.viettel.AppConfig object salein_tx_thutien_nsp { def execute_processing(spark: SparkSession): DataFrame = { // Read data plan (csv format) val df_kh_salein_class = spark.read.table("excel_form.test_KH_BH1_KH") .filter(col("Ten_chi_tieu") === "Thực xuất") .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M")) .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q")) .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y")) .withColumnRenamed("Don_vi", "SaleDepartment") .withColumnRenamed("Vung_CN", "OrganizationName") .withColumnRenamed("Kenh_ban", "SalesChannelName") .withColumnRenamed("Loai_KH", "Type") .withColumnRenamed("Ten_doi", "Team_Name") .withColumnRenamed("Thoi_gian_ke_hoach", "Day") .withColumn("Revenue_Salein_Perform", lit(0)) .withColumn("Revenue_Salein_Plan", col("Doanh_thu").cast("double")) .withColumn("So_luong", col("So_luong").cast("int")) .select( "Day", "Month", "Quarter", "Year", "SaleDepartment", "OrganizationName", "SalesChannelName", "Name_NSP1", "Name_NSP2", "Name_NSP3", "Team_Name", "Revenue_Salein_Plan", "Revenue_Salein_Perform", "Type", "Loai_ke_hoach" ) // Processing plan data val res1 = df_kh_salein_class.filter(col("Loai_ke_hoach") === "Đội" && col("Team_Name").isNotNull) .groupBy("Day","Month", "Quarter", "Year", "SaleDepartment","OrganizationName", "Name_NSP1", "Name_NSP2","Type") .agg(sum("Revenue_Salein_Plan").as("plan_theo_vung")) val df_kh_alias = df_kh_salein_class.as("df_kh") val res1_alias = res1.as("res1") val ke_hoach_kenh = df_kh_alias .join(res1_alias, df_kh_alias("Day") === res1_alias("Day") && df_kh_alias("Month") === res1_alias("Month") && df_kh_alias("Quarter") === res1_alias("Quarter") && df_kh_alias("Year") === res1_alias("Year") && df_kh_alias("SaleDepartment") === res1_alias("SaleDepartment") && df_kh_alias("OrganizationName") === res1_alias("OrganizationName") && df_kh_alias("Name_NSP1") === res1_alias("Name_NSP1") && df_kh_alias("Name_NSP2") === res1_alias("Name_NSP2") && df_kh_alias("Type") === res1_alias("Type") , "left" ) .filter(df_kh_alias("Loai_ke_hoach") === "Vùng" && df_kh_alias("Team_Name").isNull) .select( df_kh_alias("Day").as("Day"), df_kh_alias("Month").as("Month"), df_kh_alias("Quarter").as("Quarter"), df_kh_alias("Year").as("Year"), df_kh_alias("SaleDepartment").as("SaleDepartment"), df_kh_alias("OrganizationName"), df_kh_alias("SalesChannelName"), df_kh_alias("Name_NSP1"), df_kh_alias("Name_NSP2"), df_kh_alias("Name_NSP3"), df_kh_alias("Team_Name"), df_kh_alias("Type"), df_kh_alias("Loai_ke_hoach"), df_kh_alias("Revenue_Salein_Perform"), (df_kh_alias("Revenue_Salein_Plan") - res1_alias("plan_theo_vung")).as("Revenue_Salein_Plan") ) val res2 = df_kh_salein_class.filter(col("Loai_ke_hoach") === "Vùng") .groupBy("Day","Month", "Quarter", "Year", "SaleDepartment", "Name_NSP1", "Name_NSP2","Type") .agg(sum("Revenue_Salein_Plan").as("plan_theo_pbh")) val res2_alias = res2.as("res2") val ke_hoach_pbh = df_kh_alias.join(res2_alias, df_kh_alias("Day") === res2_alias("Day") && df_kh_alias("Month") === res2_alias("Month") && df_kh_alias("Quarter") === res2_alias("Quarter") && df_kh_alias("Year") === res2_alias("Year") && df_kh_alias("SaleDepartment") === res2_alias("SaleDepartment") && df_kh_alias("Name_NSP1") === res2_alias("Name_NSP1") && df_kh_alias("Name_NSP2") === res2_alias("Name_NSP2") && df_kh_alias("Type") === res2_alias("Type") , "left" ) .filter(df_kh_alias("Loai_ke_hoach") === "PBH" && df_kh_alias("SaleDepartment").isNotNull) .select( df_kh_alias("Day").as("Day"), df_kh_alias("Month").as("Month"), df_kh_alias("Quarter").as("Quarter"), df_kh_alias("Year").as("Year"), df_kh_alias("SaleDepartment").as("SaleDepartment"), df_kh_alias("OrganizationName"), df_kh_alias("SalesChannelName"), df_kh_alias("Name_NSP1"), df_kh_alias("Name_NSP2"), df_kh_alias("Name_NSP3"), df_kh_alias("Team_Name"), df_kh_alias("Type"), df_kh_alias("Loai_ke_hoach"), df_kh_alias("Revenue_Salein_Perform"), (df_kh_alias("Revenue_Salein_Plan") - res2_alias("plan_theo_pbh")).as("Revenue_Salein_Plan") ) val ke_hoach_doi = df_kh_salein_class.filter(df_kh_salein_class("Loai_ke_hoach") === "Đội" && df_kh_salein_class("Team_Name").isNotNull) val kh = ke_hoach_doi.unionByName(ke_hoach_pbh).unionByName(ke_hoach_kenh) // Return processed data of plan result } // Entry point def main(args: Array[String]): Unit = { val spark = createBatchSession() execute_processing(spark).write.format("avro").mode("overwrite").save(AppConfig.outputDir) } }
Editor is loading...
Leave a Comment