Untitled
unknown
plain_text
a year ago
7.1 kB
14
Indexable
package vn.com.viettel.code.dashboard_date
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{FloatType, TimestampType}
import vn.com.viettel.AppConfig.{date, outputDir, rootPath}
import vn.com.viettel.utils.SparkUtils.createBatchSession
import org.apache.spark.sql.expressions._
object dt_tmdt_mr_nhom_ks_date {
def main(argv: Array[String]): Unit = {
val spark: SparkSession = createBatchSession()
// Đọc dữ liệu kế hoạch
val df_tmdt_kh_nhom_ks = spark.read.format("csv")
.option("header", "true")
.option("multiLine", "true").load("/raw_zone/excel_form/upload/ke_hoach/tmdt_mr_dt_nhom_ks.csv")
.filter(col("Cac_chi_tieu") === "tmdt_mr_dt_nhom_ks" and col("Loai_KH") === "Thang")
.withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
.withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
.withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
.groupBy("Month", "Quarter", "Year",
"Don_vi", "Nhom_khao_sat", "Ten_doi").agg(sum("Doanh_thu").as("Doanh_thu_KH"))
// Chia kế hoạch theo từng ngày
val my_date = spark.read.format("avro").load("/raw_zone/full_load/date/")
val date_of_month = spark.read.format("avro").load("/raw_zone/full_load/date/").groupBy("month", "quarter", "year").agg(count("*").alias("so_ngay"))
val count_date_of_month = my_date.as("t1").join(date_of_month.as("t2"), my_date("month") === date_of_month("month") && my_date("year") === date_of_month("year"))
.select(col("t1.date"), col("t1.month"), col("t1.quarter"), col("t1.year"), col("t2.so_ngay"))
val df_tmdt_kh_nhom_ks_date = df_tmdt_kh_nhom_ks.join(count_date_of_month, df_tmdt_kh_nhom_ks("Month") === count_date_of_month("Month")
and df_tmdt_kh_nhom_ks("Year") === count_date_of_month("Year"), "left")
.withColumn("Doanh_thu_KH", col("Doanh_thu_KH") / col("so_ngay"))
// Tổng hợp thực hiện
val df_to_doi_c3 = spark.read.format("csv")
.option("header", "true")
.option("multiLine", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/Mapping_C3_to_doi.csv")
.withColumnRenamed("Đội xung kích", "Team_Name")
.withColumnRenamed("Mã KH", "Ma_KH")
.withColumn("Chi_nhanh", col("Vùng"))
val donhang = spark.read.format("csv")
.option("header", "true")
.option("multiLine", "true").load("/raw_zone/excel_form/upload/rangdong_store/c3_*.csv")
.filter(col("Username").rlike("^(RDMBC3|RDMNC3)") and col("Thành tiền") =!= "nan")
.withColumn("Đơn Giá", col("Đơn Giá").cast("double"))
.withColumn("Số lượng", col("Số lượng").cast("double"))
.withColumn("Giảm giá", col("Giảm giá").cast("double"))
.select(
col("Username"),
date_format(col("Ngày đặt hàng"), "yyyy-MM-dd").as("Date"),
when(col("Username").startsWith("RDMBC3"), "TMĐT mở rộng BH1")
.when(col("Username").startsWith("RDMNC3"), "TMĐT mở rộng BH2")
.as("EcommerceOrderType"),
col("Thành tiền").as("Doanh_thu"),
col("Barcode")
)
val df_product_class = spark.read.table("WHS.SalePlan_ProductClass")
val df_product_class_detail = spark.read.table("WHS.SalePlan_ProductClassDetail").withColumnRenamed("Class", "Name_NSP1")
val df_product_class_detail_clone_1 = df_product_class_detail.as("df_product_class_detail_clone_1").withColumnRenamed("Name_NSP1", "Name_NSP2")
val df_product_class_detail_clone_2 = df_product_class_detail.as("df_product_class_detail_clone_2").withColumnRenamed("Name_NSP1", "Name_NSP3")
val product_class = df_product_class.join(df_product_class_detail, df_product_class("N_SP1") === df_product_class_detail("ID"), "left")
.join(df_product_class_detail_clone_1, df_product_class("N_SP2") === df_product_class_detail_clone_1("ID"))
.join(df_product_class_detail_clone_2, df_product_class("N_SP3") === df_product_class_detail_clone_2("ID"))
.select(
df_product_class("ItemName"),
df_product_class("ItemCodeSAP"),
df_product_class("N_SP1"),
df_product_class("N_SP2"),
df_product_class("N_SP3"),
df_product_class("SPKM_BH1"),
df_product_class("SPKM_BH2"),
df_product_class("NhomKhaoSat"),
df_product_class_detail("Name_NSP1"),
df_product_class_detail_clone_1("Name_NSP2"),
df_product_class_detail_clone_2("Name_NSP3")
)
val th = donhang.join(product_class, donhang("Barcode") === product_class("ItemCodeSAP"), "left")
.join(df_to_doi_c3, df_to_doi_c3("Ma_KH") === donhang("Username"), "left")
.groupBy("Date", "EcommerceOrderType", "NhomKhaoSat", "Team_Name")
.agg(sum("Doanh_thu").as("Doanh_thu"))
// Join thực hiện ngày với kế hoạch ngày
val res_dt_tmdt_mr = th.join(df_tmdt_kh_nhom_ks_date, th("Date") === df_tmdt_kh_nhom_ks_date("date") and
th("EcommerceOrderType") === df_tmdt_kh_nhom_ks_date("Don_vi") and
th("NhomKhaoSat") === df_tmdt_kh_nhom_ks_date("Nhom_khao_sat") and
th("Team_Name") === df_tmdt_kh_nhom_ks_date("Ten_doi"),
"full"
).select(
coalesce(th("Date"), df_tmdt_kh_nhom_ks_date("date")).as("Date"),
coalesce(th("EcommerceOrderType"), df_tmdt_kh_nhom_ks_date("Don_vi")).as("EcommerceOrderType"),
coalesce(th("NhomKhaoSat"), df_tmdt_kh_nhom_ks_date("Nhom_khao_sat")).as("NhomKhaoSat"),
coalesce(th("Team_Name"), df_tmdt_kh_nhom_ks_date("Ten_doi")).as("Team_Name"),
th("Doanh_thu"),
df_tmdt_kh_nhom_ks_date("Doanh_thu_KH")
)
val filteredResult = res_dt_tmdt_mr
.filter(col("Date").between("2024-01-01", "2024-12-31"))
.select(
lit("tmdt_mr_dt_nhom_ks").alias("Ten_chi_tieu"),
lit(null).alias("Thoi_gian_ke_hoach"),
col("EcommerceOrderType").alias("Don_vi"),
col("NhomKhaoSat").alias("Nhom_khao_sat"),
col("Team_Name").alias("Ten_doi"),
lit(null).alias("So_luong"),
lit(null).alias("Doanh_thu"),
lit(null).alias("Loai_KH")
)
// Apply distinct to ensure unique rows
val re = filteredResult.distinct()
val reCast = re
.withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType))
.withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType))
.withColumn("Don_vi", col("Don_vi").cast(StringType))
.withColumn("Nhom_khao_sat", col("Nhom_khao_sat").cast(StringType))
.withColumn("Ten_doi", col("Ten_doi").cast(StringType))
.withColumn("So_luong", col("So_luong").cast(IntegerType))
.withColumn("Doanh_thu", col("Doanh_thu").cast(DoubleType))
.withColumn("Loai_KH", col("Loai_KH").cast(StringType))
res_dt_tmdt_mr.write.format("avro").mode("overwrite").save(outputDir)
}
}
Editor is loading...
Leave a Comment