Untitled
unknown
plain_text
a year ago
12 kB
7
Indexable
package vn.com.viettel.code.dashboard_date
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import vn.com.viettel.AppConfig
import vn.com.viettel.utils.SparkUtils.createBatchSession
object kpicustomer_checking_customer_info {
def main(args: Array[String]): Unit = {
val spark = createBatchSession()
//kpicustomer_checking_customer_info
val so_hinh_anh_chup_ve = spark.read.format("avro").load("/gold_zone/full_load/kpi_date/so_hinh_anh_chup_ve").select(
col("CreatedDate"),
when(col("Phong") === "BH1", lit("P_BH1"))
.when(col("Phong") === "BH2", lit("P_BH2"))
.alias("SalesDepartment"),
col("Vung").alias("OrganizationName"),
col("Team_Name"),
col("So_Hinh_Anh").alias("Perform_Quantity")
)
val tt_van_de_th = spark.read.format("avro").load("/gold_zone/full_load/kpi_date/tt_van_de_th").groupBy(
col("Day"),
col("SaleDepartment").alias("SalesDepartment"),
col("Organization").alias("OrganizationName"),
col("Team_Name")
).agg(sum("ProblemCount") as "Perform_Quantity")
val cap_nhap_thong_tin_diem_ban = spark.read.format("avro").load("/gold_zone/full_load/kpi_date/cap_nhap_thong_tin_diem_ban")
.select(
col("UpdatedAt"),
col("SaleDepartment").alias("SalesDepartment"),
col("Organization").alias("OrganizationName"),
col("Team_Name"),
col("cap_nhap_thong_tin_diem_ban").alias("Perform_Quantity")
)
val so_hinh_anh_ke_hoach2 = spark.read.table("WHS.SalePlan_01_1_KHDV")
val df_cap_nhat_thong_tin_diem_ban_kh = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/so_hinh_anh_chup_ve.csv")
.filter(col("Ten_chi_tieu") === "Cập nhật thông tin" and col("Loai_KH") === "Thang")
.withColumn("Don_vi", when(col("Don_vi") === "BH1", lit("P_BH1"))
.when(col("Don_vi") === "BH2", lit("P_BH2"))
.otherwise(col("Don_vi")))
.withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
.withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
.withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
.groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN" , "Ten_doi")
.agg(sum("So_luong").as("So_luong_KH"))
// Chia kế hoạch theo từng ngày
val my_date = spark.read.format("avro").load("/raw_zone/full_load/date/")
val date_of_month = spark.read.format("avro").load("/raw_zone/full_load/date/").groupBy("month", "quarter", "year").agg(count("*").alias("so_ngay"))
val count_date_of_month = my_date.as("t1").join(date_of_month.as("t2"), my_date("month") === date_of_month("month") && my_date("year") === date_of_month("year"))
.select(col("t1.date"), col("t1.month"), col("t1.quarter"), col("t1.year"), col("t2.so_ngay"))
val df_cap_nhat_thong_tin_diem_ban_kh_date = df_cap_nhat_thong_tin_diem_ban_kh.join(count_date_of_month, df_cap_nhat_thong_tin_diem_ban_kh("Month") === count_date_of_month("Month") and df_cap_nhat_thong_tin_diem_ban_kh("Year") === count_date_of_month("Year"), "left")
.withColumn("So_luong_KH", col("So_luong_KH") /col("so_ngay"))
val df_so_hinh_anh_chup_ve_kh = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/so_hinh_anh_chup_ve.csv")
.filter(col("Ten_chi_tieu") === "Số Hình Ảnh" and col("Loai_KH") === "Thang")
.withColumn("Don_vi", when(col("Don_vi") === "BH1", lit("P_BH1"))
.when(col("Don_vi") === "BH2", lit("P_BH2"))
.otherwise(col("Don_vi")))
.withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
.withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
.withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
.groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN" , "Ten_doi")
.agg(sum("So_luong").as("So_luong_KH"))
val df_so_hinh_anh_chup_ve_kh_date = df_so_hinh_anh_chup_ve_kh.join(count_date_of_month, df_so_hinh_anh_chup_ve_kh("Month") === count_date_of_month("Month") and df_so_hinh_anh_chup_ve_kh("Year") === count_date_of_month("Year"), "left")
.withColumn("So_luong_KH", col("So_luong_KH") /col("so_ngay"))
val df_tt_van_de_th_kh = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/so_hinh_anh_chup_ve.csv")
.filter(col("Ten_chi_tieu") === "Thông tin vấn đề" and col("Loai_KH") === "Thang")
.withColumn("Don_vi", when(col("Don_vi") === "BH1", lit("P_BH1"))
.when(col("Don_vi") === "BH2", lit("P_BH2"))
.otherwise(col("Don_vi")))
.withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
.withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
.withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
.groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN" , "Ten_doi")
.agg(sum("So_luong").as("So_luong_KH"))
val df_tt_van_de_th_date = df_tt_van_de_th_kh.join(count_date_of_month, df_tt_van_de_th_kh("Month") === count_date_of_month("Month") and df_tt_van_de_th_kh("Year") === count_date_of_month("Year"), "left")
.withColumn("So_luong_KH", col("So_luong_KH") /col("so_ngay"))
val result_so_hinh_anh_chup_ve = so_hinh_anh_chup_ve.join(
df_so_hinh_anh_chup_ve_kh_date,
coalesce(so_hinh_anh_chup_ve("CreatedDate"), lit("")) === coalesce(df_so_hinh_anh_chup_ve_kh_date("date"), lit("")) &&
coalesce(so_hinh_anh_chup_ve("SalesDepartment"), lit("")) === coalesce(df_so_hinh_anh_chup_ve_kh_date("Don_vi"), lit("")) &&
coalesce(so_hinh_anh_chup_ve("OrganizationName"), lit("")) === coalesce(df_so_hinh_anh_chup_ve_kh_date("Vung_CN"), lit("")) &&
coalesce(so_hinh_anh_chup_ve("Team_Name"), lit("")) === coalesce(df_so_hinh_anh_chup_ve_kh_date("Ten_doi"), lit("")),
"full"
).select(
coalesce(so_hinh_anh_chup_ve("CreatedDate"), df_so_hinh_anh_chup_ve_kh_date.col("date")).alias("CreatedDate"),
coalesce(so_hinh_anh_chup_ve("SalesDepartment"), df_so_hinh_anh_chup_ve_kh_date.col("Don_vi")).alias("SalesDepartment"),
coalesce(so_hinh_anh_chup_ve("OrganizationName"), df_so_hinh_anh_chup_ve_kh_date.col("Vung_CN")).alias("OrganizationName"),
coalesce(so_hinh_anh_chup_ve("Team_Name"), df_so_hinh_anh_chup_ve_kh_date.col("Ten_doi")).alias("Team_Name"),
so_hinh_anh_chup_ve("Perform_Quantity").alias("Perform_Quantity"),
df_so_hinh_anh_chup_ve_kh_date("So_luong_KH").alias("Plan_Quantity")
)
result_so_hinh_anh_chup_ve.createOrReplaceTempView("so_hinh_anh_chup_ve")
val result_tt_van_de_th = tt_van_de_th.join(
df_tt_van_de_th_date,
coalesce(tt_van_de_th("Day"), lit("")) === coalesce(df_tt_van_de_th_date("date"), lit("")) &&
coalesce(tt_van_de_th("SalesDepartment"), lit("")) === coalesce(df_tt_van_de_th_date("Don_vi"), lit("")) &&
coalesce(tt_van_de_th("OrganizationName"), lit("")) === coalesce(df_tt_van_de_th_date("Vung_CN"), lit("")) &&
coalesce(tt_van_de_th("Team_Name"), lit("")) === coalesce(df_tt_van_de_th_date("Ten_doi"), lit("")),
"full"
).select(
coalesce(tt_van_de_th("Day"), df_tt_van_de_th_date.col("date")).alias("Day"),
coalesce(tt_van_de_th("SalesDepartment"), df_tt_van_de_th_date.col("Don_vi")).alias("SalesDepartment"),
coalesce(tt_van_de_th("OrganizationName"), df_tt_van_de_th_date.col("Vung_CN")).alias("OrganizationName"),
coalesce(tt_van_de_th("Team_Name"), df_tt_van_de_th_date.col("Ten_doi")).alias("Team_Name"),
tt_van_de_th("Perform_Quantity").alias("Perform_Quantity"),
df_tt_van_de_th_date("So_luong_KH").alias("Plan_Quantity")
)
result_tt_van_de_th.createOrReplaceTempView("tt_van_de_th")
val result_cap_nhap_thong_tin_diem_ban = cap_nhap_thong_tin_diem_ban.join(
df_cap_nhat_thong_tin_diem_ban_kh_date,
coalesce(cap_nhap_thong_tin_diem_ban("UpdatedAt"), lit("")) === coalesce(df_cap_nhat_thong_tin_diem_ban_kh_date("date"), lit("")) &&
coalesce(cap_nhap_thong_tin_diem_ban("SalesDepartment"), lit("")) === coalesce(df_cap_nhat_thong_tin_diem_ban_kh_date("Don_vi"), lit("")) &&
coalesce(cap_nhap_thong_tin_diem_ban("OrganizationName"), lit("")) === coalesce(df_cap_nhat_thong_tin_diem_ban_kh_date("Vung_CN"), lit("")) &&
coalesce(cap_nhap_thong_tin_diem_ban("Team_Name"), lit("")) === coalesce(df_cap_nhat_thong_tin_diem_ban_kh_date("Ten_doi"), lit("")),
"full"
).select(
coalesce(cap_nhap_thong_tin_diem_ban("UpdatedAt"), df_cap_nhat_thong_tin_diem_ban_kh_date.col("date")).alias("UpdatedAt"),
coalesce(cap_nhap_thong_tin_diem_ban("SalesDepartment"), df_cap_nhat_thong_tin_diem_ban_kh_date.col("Don_vi")).alias("SalesDepartment"),
coalesce(cap_nhap_thong_tin_diem_ban("OrganizationName"), df_cap_nhat_thong_tin_diem_ban_kh_date.col("Vung_CN")).alias("OrganizationName"),
coalesce(cap_nhap_thong_tin_diem_ban("Team_Name"), df_cap_nhat_thong_tin_diem_ban_kh_date.col("Ten_doi")).alias("Team_Name"),
cap_nhap_thong_tin_diem_ban("Perform_Quantity").alias("Perform_Quantity"),
df_cap_nhat_thong_tin_diem_ban_kh_date("So_luong_KH").alias("Plan_Quantity"))
result_cap_nhap_thong_tin_diem_ban.createOrReplaceTempView("cap_nhap_thong_tin_diem_ban")
val kpicustomer_checking_customer_info = spark.sql(
"""
select
CreatedDate EventDate,
SalesDepartment,
OrganizationName,
"Số Hình Ảnh" Category,
Team_Name,
sum(Perform_Quantity) Perform_Quantity,
sum(Plan_Quantity) Plan_Quantity
from so_hinh_anh_chup_ve
group by EventDate,SalesDepartment,OrganizationName,Category, Team_Name
union
select
Day,
SalesDepartment,
OrganizationName,
"Thông tin vấn đề" Category,
Team_Name,
sum(Perform_Quantity) Perform_Quantity,
sum(Plan_Quantity) Plan_Quantity
from tt_van_de_th
group by Day,SalesDepartment,OrganizationName,Category, Team_Name
union
select
UpdatedAt,
SalesDepartment,
OrganizationName,
"Cập nhật thông tin" Category,
Team_Name,
sum(Perform_Quantity) Perform_Quantity,
sum(Plan_Quantity) Plan_Quantity
from cap_nhap_thong_tin_diem_ban
group by UpdatedAt,SalesDepartment,OrganizationName,Category, Team_Name
""")
val res = kpicustomer_checking_customer_info
.filter(col("EventDate").between("2024-01-01","2024-12-31"))
.select(
col("Category").alias("Ten_chi_tieu"),
lit(null).alias("Thoi_gian_ke_hoach"),
col("SalesDepartment").alias("Don_vi"),
col("OrganizationName").alias("Vung_CN"),
col("Team_Name").alias("Ten_doi"),
lit(null).alias("So_luong"),
lit(null).alias("Loai_KH"),
)
val re = res.distinct()
val reCast = re
.withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType))
.withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType))
.withColumn("Don_vi", col("Don_vi").cast(StringType))
.withColumn("Vung_CN", col("Vung_CN").cast(StringType))
.withColumn("Ten_doi", col("Ten_doi").cast(StringType))
.withColumn("So_luong", col("So_luong").cast(IntegerType))
.withColumn("Loai_KH", col("Loai_KH").cast(StringType))
kpicustomer_checking_customer_info.write.format("avro").mode("overwrite").save(AppConfig.outputDir)
}
}Editor is loading...
Leave a Comment