Untitled
unknown
plain_text
a year ago
7.4 kB
9
Indexable
package vn.com.viettel.code.dashboard_date
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
import vn.com.viettel.AppConfig.outputDir
import vn.com.viettel.utils.SparkUtils.createBatchSession
object kpicustomer_open_customer {
def main(argv: Array[String]): Unit = {
// Create Spark Session
val spark: SparkSession = createBatchSession()
// Load chi tieu c2_mm
val c2_mm = spark.read.format("avro").load("/gold_zone/full_load/kpi_date/c2_mm")
.withColumn(
"Category",
when(col("StoreTypeId") === 17, "C2 mở mới")
.when(col("StoreTypeId") === 18, "C2 Trọng điểm mở mới")
.when(col("StoreTypeId") === 23, "C2 Siêu lớn mở mới")
).groupBy(
"OpenDate",
"SaleDepartment",
"OrganizationName",
"Category",
"Team_Name"
).agg(
sum(col("So_luong_C2_MM")).as("Perform_Quantity")
)
// Load chi tieu c2_dc
val c2_dc = spark.read.format("avro").load("/gold_zone/full_load/kpi_date/c2_dc")
.withColumn(
"Category",
when(col("StoreTypeId") === 17, "C2 không hoạt động")
.when(col("StoreTypeId") === 18, "C2 Trọng điểm không hoạt động")
.when(col("StoreTypeId") === 23, "C2 Siêu lớn không hoạt động")
).groupBy(
"OpenDate",
"SaleDepartment",
"OrganizationName",
"Category",
"Team_Name"
).agg(
sum(col("So_luong_C2_DC")).as("Perform_Quantity")
)
// Load chi tieu khdv: hien tai de null
// val khdv = spark.read.format("avro").load("/raw_zone/excel_form/sale_plan/01.1.KHDV.avro")
val df_c2_mm_kh = spark.read.format("csv").option("header", "true").option("multiLine", "true")
.load("/raw_zone/excel_form/upload/ke_hoach/c2_mm_dc.csv")
.filter(col("Ten_chi_tieu") === "c2_mm" and col("Loai_KH") === "Thang")
.withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
.withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
.withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
.groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN","Cap_dai_ly" , "Ten_doi")
.agg(sum("So_luong").as("So_luong_KH"))
// Chia kế hoạch theo từng ngày
val my_date = spark.read.format("avro").load("/raw_zone/full_load/date/")
val date_of_month = spark.read.format("avro").load("/raw_zone/full_load/date/").groupBy("month", "quarter", "year").agg(count("*").alias("so_ngay"))
val count_date_of_month = my_date.as("t1").join(date_of_month.as("t2"), my_date("month") === date_of_month("month") && my_date("year") === date_of_month("year"))
.select(col("t1.date"), col("t1.month"), col("t1.quarter"), col("t1.year"), col("t2.so_ngay"))
val df_c2_mm_kh_date = df_c2_mm_kh.join(count_date_of_month, df_c2_mm_kh("Month") === count_date_of_month("Month") and df_c2_mm_kh("Year") === count_date_of_month("Year"), "left")
.withColumn("So_luong_KH", col("So_luong_KH") /col("so_ngay"))
val result_c2mm = c2_mm.join(
df_c2_mm_kh_date,
coalesce(c2_mm("OpenDate"), lit("")) === coalesce(df_c2_mm_kh_date("date"), lit("")) &&
coalesce(c2_mm("SaleDepartment"), lit("")) === coalesce(df_c2_mm_kh_date("Don_vi"), lit("")) &&
coalesce(c2_mm("OrganizationName"), lit("")) === coalesce(df_c2_mm_kh_date("Vung_CN"), lit("")) &&
coalesce(c2_mm("Team_Name"), lit("")) === coalesce(df_c2_mm_kh_date("Ten_doi"), lit("")) ,
"full"
).select(
coalesce(c2_mm("OpenDate"), df_c2_mm_kh_date.col("date")).alias("Date"),
coalesce(c2_mm("SaleDepartment"), df_c2_mm_kh_date.col("Don_vi")).alias("SaleDepartment"),
coalesce(c2_mm("OrganizationName"), df_c2_mm_kh_date.col("Vung_CN")).alias("OrganizationName"),
coalesce(c2_mm("Category"), df_c2_mm_kh_date.col("Cap_dai_ly")).alias("Category"),
coalesce(c2_mm("Team_Name"), df_c2_mm_kh_date.col("Ten_doi")).alias("Team_Name"),
c2_mm("Perform_Quantity").alias("Perform_Quantity"),
df_c2_mm_kh_date("So_luong_KH").alias("Plan_Quantity")
)
val df_c2_dc_kh = spark.read.format("csv").option("header", "true").option("multiLine", "true")
.load("/raw_zone/excel_form/upload/ke_hoach/c2_mm_dc.csv")
.filter(col("Ten_chi_tieu") === "c2_dc" and col("Loai_KH") === "Thang")
.withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
.withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
.withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y")).groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN","Cap_dai_ly" , "Ten_doi")
.agg(sum("So_luong").as("So_luong_KH"))
// Chia kế hoạch theo từng ngày
val df_c2_dc_kh_date = df_c2_dc_kh.join(count_date_of_month, df_c2_dc_kh("Month") === count_date_of_month("Month") and df_c2_dc_kh("Year") === count_date_of_month("Year"), "left")
.withColumn("So_luong_KH", col("So_luong_KH") /col("so_ngay"))
val result_c2dc = c2_dc.join(
df_c2_dc_kh_date,
coalesce(c2_dc("OpenDate"), lit("")) === coalesce(df_c2_dc_kh_date("date"), lit("")) &&
coalesce(c2_dc("SaleDepartment"), lit("")) === coalesce(df_c2_dc_kh_date("Don_vi"), lit("")) &&
coalesce(c2_dc("OrganizationName"), lit("")) === coalesce(df_c2_dc_kh_date("Vung_CN"), lit("")) &&
coalesce(c2_dc("Team_Name"), lit("")) === coalesce(df_c2_dc_kh_date("Ten_doi"), lit("")) ,
"full"
).select(
coalesce(c2_dc("OpenDate"), df_c2_dc_kh_date.col("date")).alias("Date"),
coalesce(c2_dc("SaleDepartment"), df_c2_dc_kh_date.col("Don_vi")).alias("SaleDepartment"),
coalesce(c2_dc("OrganizationName"), df_c2_dc_kh_date.col("Vung_CN")).alias("OrganizationName"),
coalesce(c2_dc("Category"), df_c2_dc_kh_date.col("Cap_dai_ly")).alias("Category"),
coalesce(c2_dc("Team_Name"), df_c2_dc_kh_date.col("Ten_doi")).alias("Team_Name"),
c2_dc("Perform_Quantity").alias("Perform_Quantity"),
df_c2_dc_kh_date("So_luong_KH").alias("Plan_Quantity")
)
val date = result_c2dc.unionByName(result_c2mm)
val res = date
.filter(col("Date").between("2024-01-01", "2024-12-31") && col("Category").contains("mở mới"))
.select(
col("Category").alias("Ten_chi_tieu"),
lit(null).alias("Thoi_gian_ke_hoach"),
col("SaleDepartment").alias("Don_vi"),
col("OrganizationName").alias("Vung_CN"),
col("Team_Name").alias("Ten_doi"),
lit(null).alias("So_luong"),
lit(null).alias("Loai_KH")
)
val re = res.distinct()
val reCast = re
.withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType))
.withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType))
.withColumn("Don_vi", col("Don_vi").cast(StringType))
.withColumn("Vung_CN", col("Vung_CN").cast(StringType))
.withColumn("Ten_doi", col("Ten_doi").cast(StringType))
.withColumn("So_luong", col("So_luong").cast(IntegerType))
.withColumn("Loai_KH", col("Loai_KH").cast(StringType))
date.write.format("avro").mode("overwrite").save(outputDir)
}
}
Editor is loading...
Leave a Comment