Untitled
unknown
plain_text
a year ago
12 kB
4
Indexable
package vn.com.viettel.code.dashboard_date import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import vn.com.viettel.AppConfig import vn.com.viettel.utils.SparkUtils.createBatchSession object kpicustomer_checking_customer_info { def main(args: Array[String]): Unit = { val spark = createBatchSession() //kpicustomer_checking_customer_info val so_hinh_anh_chup_ve = spark.read.format("avro").load("/gold_zone/full_load/kpi_date/so_hinh_anh_chup_ve").select( col("CreatedDate"), when(col("Phong") === "BH1", lit("P_BH1")) .when(col("Phong") === "BH2", lit("P_BH2")) .alias("SalesDepartment"), col("Vung").alias("OrganizationName"), col("Team_Name"), col("So_Hinh_Anh").alias("Perform_Quantity") ) val tt_van_de_th = spark.read.format("avro").load("/gold_zone/full_load/kpi_date/tt_van_de_th").groupBy( col("Day"), col("SaleDepartment").alias("SalesDepartment"), col("Organization").alias("OrganizationName"), col("Team_Name") ).agg(sum("ProblemCount") as "Perform_Quantity") val cap_nhap_thong_tin_diem_ban = spark.read.format("avro").load("/gold_zone/full_load/kpi_date/cap_nhap_thong_tin_diem_ban") .select( col("UpdatedAt"), col("SaleDepartment").alias("SalesDepartment"), col("Organization").alias("OrganizationName"), col("Team_Name"), col("cap_nhap_thong_tin_diem_ban").alias("Perform_Quantity") ) val so_hinh_anh_ke_hoach2 = spark.read.table("WHS.SalePlan_01_1_KHDV") val df_cap_nhat_thong_tin_diem_ban_kh = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/so_hinh_anh_chup_ve.csv") .filter(col("Ten_chi_tieu") === "Cập nhật thông tin" and col("Loai_KH") === "Thang") .withColumn("Don_vi", when(col("Don_vi") === "BH1", lit("P_BH1")) .when(col("Don_vi") === "BH2", lit("P_BH2")) .otherwise(col("Don_vi"))) .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M")) .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q")) .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y")) .groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN" , "Ten_doi") .agg(sum("So_luong").as("So_luong_KH")) // Chia kế hoạch theo từng ngày val my_date = spark.read.format("avro").load("/raw_zone/full_load/date/") val date_of_month = spark.read.format("avro").load("/raw_zone/full_load/date/").groupBy("month", "quarter", "year").agg(count("*").alias("so_ngay")) val count_date_of_month = my_date.as("t1").join(date_of_month.as("t2"), my_date("month") === date_of_month("month") && my_date("year") === date_of_month("year")) .select(col("t1.date"), col("t1.month"), col("t1.quarter"), col("t1.year"), col("t2.so_ngay")) val df_cap_nhat_thong_tin_diem_ban_kh_date = df_cap_nhat_thong_tin_diem_ban_kh.join(count_date_of_month, df_cap_nhat_thong_tin_diem_ban_kh("Month") === count_date_of_month("Month") and df_cap_nhat_thong_tin_diem_ban_kh("Year") === count_date_of_month("Year"), "left") .withColumn("So_luong_KH", col("So_luong_KH") /col("so_ngay")) val df_so_hinh_anh_chup_ve_kh = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/so_hinh_anh_chup_ve.csv") .filter(col("Ten_chi_tieu") === "Số Hình Ảnh" and col("Loai_KH") === "Thang") .withColumn("Don_vi", when(col("Don_vi") === "BH1", lit("P_BH1")) .when(col("Don_vi") === "BH2", lit("P_BH2")) .otherwise(col("Don_vi"))) .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M")) .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q")) .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y")) .groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN" , "Ten_doi") .agg(sum("So_luong").as("So_luong_KH")) val df_so_hinh_anh_chup_ve_kh_date = df_so_hinh_anh_chup_ve_kh.join(count_date_of_month, df_so_hinh_anh_chup_ve_kh("Month") === count_date_of_month("Month") and df_so_hinh_anh_chup_ve_kh("Year") === count_date_of_month("Year"), "left") .withColumn("So_luong_KH", col("So_luong_KH") /col("so_ngay")) val df_tt_van_de_th_kh = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/so_hinh_anh_chup_ve.csv") .filter(col("Ten_chi_tieu") === "Thông tin vấn đề" and col("Loai_KH") === "Thang") .withColumn("Don_vi", when(col("Don_vi") === "BH1", lit("P_BH1")) .when(col("Don_vi") === "BH2", lit("P_BH2")) .otherwise(col("Don_vi"))) .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M")) .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q")) .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y")) .groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN" , "Ten_doi") .agg(sum("So_luong").as("So_luong_KH")) val df_tt_van_de_th_date = df_tt_van_de_th_kh.join(count_date_of_month, df_tt_van_de_th_kh("Month") === count_date_of_month("Month") and df_tt_van_de_th_kh("Year") === count_date_of_month("Year"), "left") .withColumn("So_luong_KH", col("So_luong_KH") /col("so_ngay")) val result_so_hinh_anh_chup_ve = so_hinh_anh_chup_ve.join( df_so_hinh_anh_chup_ve_kh_date, coalesce(so_hinh_anh_chup_ve("CreatedDate"), lit("")) === coalesce(df_so_hinh_anh_chup_ve_kh_date("date"), lit("")) && coalesce(so_hinh_anh_chup_ve("SalesDepartment"), lit("")) === coalesce(df_so_hinh_anh_chup_ve_kh_date("Don_vi"), lit("")) && coalesce(so_hinh_anh_chup_ve("OrganizationName"), lit("")) === coalesce(df_so_hinh_anh_chup_ve_kh_date("Vung_CN"), lit("")) && coalesce(so_hinh_anh_chup_ve("Team_Name"), lit("")) === coalesce(df_so_hinh_anh_chup_ve_kh_date("Ten_doi"), lit("")), "full" ).select( coalesce(so_hinh_anh_chup_ve("CreatedDate"), df_so_hinh_anh_chup_ve_kh_date.col("date")).alias("CreatedDate"), coalesce(so_hinh_anh_chup_ve("SalesDepartment"), df_so_hinh_anh_chup_ve_kh_date.col("Don_vi")).alias("SalesDepartment"), coalesce(so_hinh_anh_chup_ve("OrganizationName"), df_so_hinh_anh_chup_ve_kh_date.col("Vung_CN")).alias("OrganizationName"), coalesce(so_hinh_anh_chup_ve("Team_Name"), df_so_hinh_anh_chup_ve_kh_date.col("Ten_doi")).alias("Team_Name"), so_hinh_anh_chup_ve("Perform_Quantity").alias("Perform_Quantity"), df_so_hinh_anh_chup_ve_kh_date("So_luong_KH").alias("Plan_Quantity") ) result_so_hinh_anh_chup_ve.createOrReplaceTempView("so_hinh_anh_chup_ve") val result_tt_van_de_th = tt_van_de_th.join( df_tt_van_de_th_date, coalesce(tt_van_de_th("Day"), lit("")) === coalesce(df_tt_van_de_th_date("date"), lit("")) && coalesce(tt_van_de_th("SalesDepartment"), lit("")) === coalesce(df_tt_van_de_th_date("Don_vi"), lit("")) && coalesce(tt_van_de_th("OrganizationName"), lit("")) === coalesce(df_tt_van_de_th_date("Vung_CN"), lit("")) && coalesce(tt_van_de_th("Team_Name"), lit("")) === coalesce(df_tt_van_de_th_date("Ten_doi"), lit("")), "full" ).select( coalesce(tt_van_de_th("Day"), df_tt_van_de_th_date.col("date")).alias("Day"), coalesce(tt_van_de_th("SalesDepartment"), df_tt_van_de_th_date.col("Don_vi")).alias("SalesDepartment"), coalesce(tt_van_de_th("OrganizationName"), df_tt_van_de_th_date.col("Vung_CN")).alias("OrganizationName"), coalesce(tt_van_de_th("Team_Name"), df_tt_van_de_th_date.col("Ten_doi")).alias("Team_Name"), tt_van_de_th("Perform_Quantity").alias("Perform_Quantity"), df_tt_van_de_th_date("So_luong_KH").alias("Plan_Quantity") ) result_tt_van_de_th.createOrReplaceTempView("tt_van_de_th") val result_cap_nhap_thong_tin_diem_ban = cap_nhap_thong_tin_diem_ban.join( df_cap_nhat_thong_tin_diem_ban_kh_date, coalesce(cap_nhap_thong_tin_diem_ban("UpdatedAt"), lit("")) === coalesce(df_cap_nhat_thong_tin_diem_ban_kh_date("date"), lit("")) && coalesce(cap_nhap_thong_tin_diem_ban("SalesDepartment"), lit("")) === coalesce(df_cap_nhat_thong_tin_diem_ban_kh_date("Don_vi"), lit("")) && coalesce(cap_nhap_thong_tin_diem_ban("OrganizationName"), lit("")) === coalesce(df_cap_nhat_thong_tin_diem_ban_kh_date("Vung_CN"), lit("")) && coalesce(cap_nhap_thong_tin_diem_ban("Team_Name"), lit("")) === coalesce(df_cap_nhat_thong_tin_diem_ban_kh_date("Ten_doi"), lit("")), "full" ).select( coalesce(cap_nhap_thong_tin_diem_ban("UpdatedAt"), df_cap_nhat_thong_tin_diem_ban_kh_date.col("date")).alias("UpdatedAt"), coalesce(cap_nhap_thong_tin_diem_ban("SalesDepartment"), df_cap_nhat_thong_tin_diem_ban_kh_date.col("Don_vi")).alias("SalesDepartment"), coalesce(cap_nhap_thong_tin_diem_ban("OrganizationName"), df_cap_nhat_thong_tin_diem_ban_kh_date.col("Vung_CN")).alias("OrganizationName"), coalesce(cap_nhap_thong_tin_diem_ban("Team_Name"), df_cap_nhat_thong_tin_diem_ban_kh_date.col("Ten_doi")).alias("Team_Name"), cap_nhap_thong_tin_diem_ban("Perform_Quantity").alias("Perform_Quantity"), df_cap_nhat_thong_tin_diem_ban_kh_date("So_luong_KH").alias("Plan_Quantity")) result_cap_nhap_thong_tin_diem_ban.createOrReplaceTempView("cap_nhap_thong_tin_diem_ban") val kpicustomer_checking_customer_info = spark.sql( """ select CreatedDate EventDate, SalesDepartment, OrganizationName, "Số Hình Ảnh" Category, Team_Name, sum(Perform_Quantity) Perform_Quantity, sum(Plan_Quantity) Plan_Quantity from so_hinh_anh_chup_ve group by EventDate,SalesDepartment,OrganizationName,Category, Team_Name union select Day, SalesDepartment, OrganizationName, "Thông tin vấn đề" Category, Team_Name, sum(Perform_Quantity) Perform_Quantity, sum(Plan_Quantity) Plan_Quantity from tt_van_de_th group by Day,SalesDepartment,OrganizationName,Category, Team_Name union select UpdatedAt, SalesDepartment, OrganizationName, "Cập nhật thông tin" Category, Team_Name, sum(Perform_Quantity) Perform_Quantity, sum(Plan_Quantity) Plan_Quantity from cap_nhap_thong_tin_diem_ban group by UpdatedAt,SalesDepartment,OrganizationName,Category, Team_Name """) val res = kpicustomer_checking_customer_info .filter(col("EventDate").between("2024-01-01","2024-12-31")) .select( col("Category").alias("Ten_chi_tieu"), lit(null).alias("Thoi_gian_ke_hoach"), col("SalesDepartment").alias("Don_vi"), col("OrganizationName").alias("Vung_CN"), col("Team_Name").alias("Ten_doi"), lit(null).alias("So_luong"), lit(null).alias("Loai_KH"), ) val re = res.distinct() val reCast = re .withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType)) .withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType)) .withColumn("Don_vi", col("Don_vi").cast(StringType)) .withColumn("Vung_CN", col("Vung_CN").cast(StringType)) .withColumn("Ten_doi", col("Ten_doi").cast(StringType)) .withColumn("So_luong", col("So_luong").cast(IntegerType)) .withColumn("Loai_KH", col("Loai_KH").cast(StringType)) kpicustomer_checking_customer_info.write.format("avro").mode("overwrite").save(AppConfig.outputDir) } }
Editor is loading...
Leave a Comment