Untitled
unknown
plain_text
a year ago
39 kB
18
Indexable
/// code lay bieu mau saleout mt
package vn.com.viettel.code.dashboard_date
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import vn.com.viettel.AppConfig.outputDir
import vn.com.viettel.utils.SparkUtils.createBatchSession
object saleout_sp_mt {
def main(argv: Array[String]): Unit = {
val spark: SparkSession = createBatchSession()
// Đọc kế hoạch
val df_kh_saleout_spmt = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/saleout_spmt.csv").filter(col("Ten_chi_tieu") === "SaleOut")
.withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
.withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
.withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
val df_kh_saleout_thang = df_kh_saleout_spmt.filter(col("Loai_KH") === "Thang").groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN", "Name_SPMT", "Ten_doi")
.agg(sum("So_luong").as("SaleOut_SL_SP_MT_KH"), sum("Doanh_thu").as("SaleOut_DT_SP_MT_KH"))
// generate data from 2010 and 2050
val my_date = spark.read.format("avro").load("/raw_zone/full_load/date/")
val date_of_month = spark.read.format("avro").load("/raw_zone/full_load/date/").groupBy("month", "quarter", "year").agg(count("*").alias("so_ngay"))
val count_date_of_month = my_date.as("t1").join(date_of_month.as("t2"), my_date("month") === date_of_month("month") && my_date("year") === date_of_month("year"))
.select(col("t1.date"), col("t1.month"), col("t1.quarter"), col("t1.year"), col("t2.so_ngay"))
// Gia_tri_ke_hoach and So_luong divide so_ngay
val kh_saleout_spmt_date = df_kh_saleout_thang.join(count_date_of_month, df_kh_saleout_thang("Month") === count_date_of_month("Month") and df_kh_saleout_thang("Year") === count_date_of_month("Year"), "left")
.withColumn("SaleOut_SL_SP_MT_KH", col("SaleOut_SL_SP_MT_KH") / col("so_ngay"))
.withColumn("SaleOut_DT_SP_MT_KH", col("SaleOut_DT_SP_MT_KH") / col("so_ngay"))
// Tổng hợp thực hiện
val df_appuser = spark.read.table("whs.masterdata_appuser")
val df_to_doi_c2 = spark.read.format("csv").option("multiLine", "true").option("header", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/to_doi_saleout.csv")
val listBuyerStoreTypeID = Seq(17, 18, 23)
val iso = spark.read.table("WHS.Order_IndirectSalesOrder")
.withColumn("OrderDate", col("OrderDate") + expr("INTERVAL 7 HOURS"))
.filter(col("BuyerStoreTypeId").isin(listBuyerStoreTypeID: _*))
.filter(col("GeneralIndirectStateId") === 3)
val isoc = spark.read.table("WHS.Order_IndirectSalesOrderContent")
val store = spark.read.table("WHS.MasterData_DMSStore")
.withColumn("StartDate", col("StartDate") + expr("INTERVAL 7 HOURS"))
.withColumn("EndDate", col("EndDate") + expr("INTERVAL 7 HOURS"))
.where(
col("Code").like("TV01%") || col("Code").like("TV02%") || col("Code").like("TV03%") || col("Code").like("TN%") || col("Code").like("TG%") || col("Code").like("NT%") || col("Code").like("CT%") || col("Code").like("DN%") || col("Code").like("BH%") || col("Code").like("HCM%")
)
val mappingc2 =spark.read.format("csv").option("multiLine", "true").option("header", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/mapping_dms_c2td.csv")
val item = spark.read.table("WHS.MasterData_Item").filter(col("DeletedAt").isNull)
val ppgm = spark.read.table("WHS.MasterData_ProductProductGroupingMapping")
val windowspec = Window.partitionBy("Name").orderBy(desc("UpdatedAt"))
val productgrouping = spark.read.table("WHS.MasterData_ProductGrouping").filter(col("DeletedAt").isNull).withColumn("rn", row_number().over(windowspec)).filter(col("rn") === 1)
// sản phẩm mục tiêu
val mapping = item.as("i").join(
ppgm.as("ppgm"),
col("i.ProductId") === col("ppgm.ProductId"),
"left"
).join(
productgrouping.as("pg"),
col("ppgm.ProductGroupingId") === col("pg.Id"),
"left"
).select(
col("i.Code").as("ItemCode"),
col("i.Id").as("ItemId"),
col("pg.Code"),
col("pg.Name"),
col("pg.UpdatedAt")
).distinct
// Đơn hàng excel
val excel = spark.read.format("csv")
.option("header", "true")
.option("multiLine", "true")
.load("/raw_zone/excel_form/upload/rangdong_store/c2_*.csv")
.withColumn("Ngay_dat_hang", split(col("Ngày đặt hàng"), " ")(0))
.withColumn(
"Ngay_dat_hang",
when(col("Ngay_dat_hang").contains("/"), to_date(col("Ngay_dat_hang"), "M/d/y"))
.when(col("Ngay_dat_hang").contains("-"), to_date(col("Ngay_dat_hang"), "yyyy-MM-dd")))
.withColumn("OrderDate", from_unixtime(unix_timestamp(col("Ngày đặt hàng"), "yyyy-MM-dd HH:mm:ss")).alias("OrderDate"))
val don_hang = excel
.join(mappingc2, mappingc2("User_Name") === excel("Username"), "left")
.withColumn(("Id_excel"), substring(excel("Username"), -7, 7).cast("bigint"))
.join(store, store("Id") === col("Id_excel"), "left")
.join(item, excel("BarCode") === item("Code"), "left")
.join(
mapping,
item("Id") === mapping("ItemId"),
"left"
)
.filter(excel("OrderDate").between(store("StartDate") + expr("INTERVAL 7 HOURS"), store("EndDate") + expr("INTERVAL 7 HOURS")))
.filter(col("StoreTypeId").isin(17, 18, 23))
.withColumn("Organization",
when(store("Code").startsWith("TV01"), lit("TV01"))
.when(store("Code").startsWith("TV02"), lit("TV02"))
.when(store("Code").startsWith("TV03"), lit("TV03"))
.when(store("Code").startsWith("TN"), lit("TN"))
.when(store("Code").startsWith("NT"), lit("NT"))
.when(store("Code").startsWith("TG"), lit("TG"))
.when(store("Code").startsWith("CT"), lit("CT"))
.when(store("Code").startsWith("DN"), lit("DN"))
.when(store("Code").startsWith("BH"), lit("BH"))
.when(store("Code").startsWith("HCM"), lit("HCM")))
.withColumn("SaleDepartment",
when(store("Code").startsWith("TV01") or store("Code").startsWith("TV02") or store("Code").startsWith("TV03"), lit("P_BH1"))
.when(store("Code").startsWith("TN") or store("Code").startsWith("TG") or store("Code").startsWith("NT") or store("Code").startsWith("CT") or store("Code").startsWith("DN") or store("Code").startsWith("BH") or store("Code").startsWith("HCM"), lit("P_BH2")))
.select(
to_date(excel("Ngay_dat_hang")).alias("Day"),
col("SaleDepartment"),
col("Organization"),
mapping("Name").alias("ProductGroupName"),
col("Thành tiền 2").alias("SaleOut_DT_SP_MT"),
col("Số lượng").alias("SaleOut_SL_SP_MT"),
mappingc2("Doi").alias("Team_Name")
)
// Đơn hàng DMS
val th = iso.as("t0")
.join(df_appuser, df_appuser("Id") === col("t0.SaleEmployeeId"), "left")
.join(df_to_doi_c2, df_appuser("Username") === df_to_doi_c2("Username"), "left")
.join(
store.as("t1"),
col("t0.BuyerStoreId") === col("t1.Id") && col("t0.OrderDate").between(col("t1.StartDate"),col("t1.EndDate")),
"left"
).join(
isoc.as("t2"),
col("t0.Id") === col("t2.IndirectSalesOrderId"),
"left"
).join(
mapping.as("m"),
col("t2.ItemId") === col("m.ItemId"),
"left"
)
.select(
date_format(col("t0.OrderDate"), "yyyy-MM-dd").alias("Day"),
when(col("t0.StoreCode").like("TV01%") || col("t0.StoreCode").like("TV02%") || col("t0.StoreCode").like("TV03%"), "P_BH1")
.when(col("t0.StoreCode").like("TN%") || col("t0.StoreCode").like("TG%") || col("t0.StoreCode").like("NT%") || col("t0.StoreCode").like("CT%") || col("t0.StoreCode").like("DN%") || col("t0.StoreCode").like("BH%") || col("t0.StoreCode").like("HCM%"), "P_BH2")
.as("SaleDepartment"),
when(col("t1.Code").like("TV01%"), "TV01")
.when(col("t1.Code").like("TV02%"), "TV02")
.when(col("t1.Code").like("TV03%"), "TV03")
.when(col("t1.Code").like("TN%"), "TN")
.when(col("t1.Code").like("TG%"), "TG")
.when(col("t1.Code").like("NT%"), "NT")
.when(col("t1.Code").like("CT%"), "CT")
.when(col("t1.Code").like("DN%"), "DN")
.when(col("t1.Code").like("BH%"), "BH")
.when(col("t1.Code").like("HCM%"), "HCM")
.as("Organization"),
col("t2.Amount").as("SaleOut_DT_SP_MT"),
col("t2.Quantity").as("SaleOut_SL_SP_MT"),
col("m.Name").as("ProductGroupName"),
df_to_doi_c2("Team_Name")
).unionByName(don_hang)
.groupBy(
col("Day"),
col("SaleDepartment"),
col("Organization"),
col("ProductGroupName"),
col("Team_Name")
).agg(
sum("SaleOut_DT_SP_MT").as("SaleOut_DT_SP_MT"),
sum("SaleOut_SL_SP_MT").as("SaleOut_SL_SP_MT")
)
val th_thang = th.select(
"Day",
"SaleDepartment",
"Organization",
"ProductGroupName",
"SaleOut_DT_SP_MT",
"SaleOut_SL_SP_MT",
"Team_Name"
)
val thang = th_thang.join(kh_saleout_spmt_date, th_thang("Day") === kh_saleout_spmt_date("date") and
th_thang("SaleDepartment") === kh_saleout_spmt_date("Don_vi") and
th_thang("Organization") === kh_saleout_spmt_date("Vung_CN") and
th_thang("ProductGroupName") === kh_saleout_spmt_date("Name_SPMT") and
th_thang("Team_Name") === kh_saleout_spmt_date("Ten_doi"),
"full"
).select(
coalesce(th_thang("Day"), kh_saleout_spmt_date("date")).as("Day"),
coalesce(th_thang("SaleDepartment"), kh_saleout_spmt_date("Don_vi")).as("SaleDepartment"),
coalesce(th_thang("Organization"), kh_saleout_spmt_date("Vung_CN")).as("Organization"),
coalesce(th_thang("ProductGroupName"), kh_saleout_spmt_date("Name_SPMT")).as("ProductGroupName"),
coalesce(th_thang("Team_Name"), kh_saleout_spmt_date("Ten_doi")).as("Team_Name"),
th_thang("SaleOut_DT_SP_MT"),
th_thang("SaleOut_SL_SP_MT"),
kh_saleout_spmt_date("SaleOut_DT_SP_MT_KH"),
kh_saleout_spmt_date("SaleOut_SL_SP_MT_KH")
)
val res = thang
.filter(col("Day").between("2024-01-01", "2024-12-31"))
.select(
lit("SaleOut").alias("Ten_chi_tieu"),
lit(null).alias("Thoi_gian_ke_hoach"),
col("SaleDepartment").alias("Don_vi"),
col("Organization").alias("Vung_CN"),
col("ProductGroupName").alias("Name_SPMT"),
col("Team_Name").alias("Ten_doi"),
lit(null).alias("So_luong"),
lit(null).alias("Doanh_thu")
)
val re = res.distinct()
val reCast = re
.withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType))
.withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType))
.withColumn("Don_vi", col("Don_vi").cast(StringType))
.withColumn("Vung_CN", col("Vung_CN").cast(StringType))
.withColumn("Name_SPMT", col("Name_SPMT").cast(StringType))
.withColumn("Ten_doi", col("Ten_doi").cast(StringType))
.withColumn("So_luong", col("So_luong").cast(IntegerType))
.withColumn("Doanh_thu", col("Doanh_thu").cast(DoubleType))
thang.write.format("avro").mode("overwrite").save(outputDir)
}
}
/// code lay bieu mau saleout km
package vn.com.viettel.code.dashboard_date
import org.apache.spark.sql.functions._
import vn.com.viettel.AppConfig
import vn.com.viettel.utils.SparkUtils.createBatchSession
object -+saleout_sp_km {
def main(args: Array[String]): Unit = {
val spark = createBatchSession()
val indirectsalesorder = spark.read.table("WHS.Order_IndirectSalesOrder")
.withColumn("OrderDate",col("OrderDate") + expr("INTERVAL 7 HOURS"))
val indirectsaleordercontent = spark.read.table("WHS.Order_IndirectSalesOrderContent")
val store = spark.read.table("WHS.MasterData_DMSStore")
val saleplan_product_class = spark.read.table("WHS.SalePlan_ProductClass")
val saleplan_product_class_detail = spark.read.table("WHS.SalePlan_ProductClassDetail")
val item = spark.read.table("WHS.MasterData_Item")
val khsp_bh1_bh2 = spark.read.format("avro").load("/gold_zone/full_load/whs/saleplan/khsp_bh1_bh2")
// val date = spark.read.format("avro").load(f"${AppConfig.rootPath}/date")
val organization = spark.read.table("WHS.SaleChannel_BM02_Organization_RD")
val mappingc2 =spark.read.format("csv").option("multiLine", "true").option("header", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/mapping_dms_c2td.csv")
val df_appuser = spark.read.table("whs.masterdata_appuser")
val df_to_doi_c2 = spark.read.format("csv").option("multiLine", "true").option("header", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/to_doi_saleout.csv")
indirectsalesorder.createOrReplaceTempView("indirectsalesorder")
store.createOrReplaceTempView("store")
saleplan_product_class.createOrReplaceTempView("saleplan_product_class")
saleplan_product_class_detail.createOrReplaceTempView("saleplan_product_class_detail")
indirectsaleordercontent.createOrReplaceTempView("indirectsaleordercontent")
item.createOrReplaceTempView("item")
// date.createOrReplaceTempView("date")
organization.createOrReplaceTempView("organization")
df_to_doi_c2.createOrReplaceTempView("df_to_doi_c2")
df_appuser.createOrReplaceTempView("df_appuser")
val excel = spark.read.format("csv")
.option("header", "true")
.option("multiLine", "true")
// .load("/raw_zone/excel_form/Order.csv")
.load("/raw_zone/excel_form/upload/rangdong_store/c2_*.csv")
.withColumn("Ngay_dat_hang", split(col("Ngày đặt hàng"), " ")(0))
.withColumn(
"Ngay_dat_hang",
when(col("Ngay_dat_hang").contains("/"), to_date(col("Ngay_dat_hang"), "M/d/y"))
.when(col("Ngay_dat_hang").contains("-"), to_date(col("Ngay_dat_hang"), "yyyy-MM-dd")))
.withColumn("OrderDate", from_unixtime(unix_timestamp(col("Ngày đặt hàng"), "yyyy-MM-dd HH:mm:ss")).alias("OrderDate"))
val don_hang = excel
.join(mappingc2, mappingc2("User_Name") === excel("Username"), "left")
.withColumn(("Id_excel"), substring(excel("Username"), -7, 7).cast("bigint"))
.join(store, store("Id") === col("Id_excel"), "left")
.join(saleplan_product_class, excel("Barcode") === saleplan_product_class("ItemCodeSAP"), "left")
.filter(excel("OrderDate").between(store("StartDate") + expr("INTERVAL 7 HOURS"), store("EndDate") + expr("INTERVAL 7 HOURS")))
.filter(col("StoreTypeId").isin(17, 18, 23))
.withColumn("Organization",
when(store("Code").startsWith("TV01"), lit("TV01"))
.when(store("Code").startsWith("TV02"), lit("TV02"))
.when(store("Code").startsWith("TV03"), lit("TV03"))
.when(store("Code").startsWith("TN"), lit("TN"))
.when(store("Code").startsWith("NT"), lit("NT"))
.when(store("Code").startsWith("TG"), lit("TG"))
.when(store("Code").startsWith("CT"), lit("CT"))
.when(store("Code").startsWith("DN"), lit("DN"))
.when(store("Code").startsWith("BH"), lit("BH"))
.when(store("Code").startsWith("HCM"), lit("HCM")))
.withColumn("SaleDepartment",
when(store("Code").startsWith("TV01") or store("Code").startsWith("TV02") or store("Code").startsWith("TV03"), lit("P_BH1"))
.when(store("Code").startsWith("TN") or store("Code").startsWith("TG") or store("Code").startsWith("NT") or store("Code").startsWith("CT") or store("Code").startsWith("DN") or store("Code").startsWith("BH") or store("Code").startsWith("HCM"), lit("P_BH2")))
.select(
to_date(excel("Ngay_dat_hang")).alias("Day"),
(col("Số lượng") * col("Đơn Giá") - coalesce(col("Giảm giá"), lit("0"))).alias("Amount"),
col("SaleDepartment"),
col("Organization"),
col("Số lượng").as("Quantity"),
saleplan_product_class("SPKM_BH1"),
saleplan_product_class("SPKM_BH2"),
mappingc2("Doi").alias("Team_Name")
)
val saleout_sp_km_temp = spark.sql(
s"""
SELECT date_format(OrderDate,"yyyy-MM-dd") Day,
isoc.Amount,
CASE
WHEN iso.StoreCode LIKE 'TV01%' THEN 'P_BH1'
WHEN iso.StoreCode LIKE 'TV02%' THEN 'P_BH1'
WHEN iso.StoreCode LIKE 'TV03%' THEN 'P_BH1'
WHEN iso.StoreCode LIKE 'TN%' THEN 'P_BH2'
WHEN iso.StoreCode LIKE 'TG%' THEN 'P_BH2'
WHEN iso.StoreCode LIKE 'NT%' THEN 'P_BH2'
WHEN iso.StoreCode LIKE 'CT%' THEN 'P_BH2'
WHEN iso.StoreCode LIKE 'DN%' THEN 'P_BH2'
WHEN iso.StoreCode LIKE 'BH%' THEN 'P_BH2'
WHEN iso.StoreCode LIKE 'HCM%' THEN 'P_BH2'
END AS SaleDepartment,
CASE
when s.Code like 'TV01%' then "TV01"
when s.Code like 'TV02%' then "TV02"
when s.Code like 'TV03%' then "TV03"
when s.Code like 'TN%' then "TN"
when s.Code like 'TG%'then "TG"
when s.Code like 'NT%'then "NT"
when s.Code like 'CT%' then "CT"
when s.Code like 'DN%' then "DN"
when s.Code like 'BH%' then "BH"
when s.Code like 'HCM%' then "HCM"
END AS Organization,
isoc.Quantity,
spc.SPKM_BH1,
spc.SPKM_BH2,
dtdc2.Team_Name
FROM indirectsaleordercontent AS isoc
LEFT JOIN indirectsalesorder as iso on isoc.IndirectSalesOrderId = iso.ID
LEFT JOIN store AS s ON s.Id = iso.buyerstoreID and iso.OrderDate BETWEEN s.StartDate and s.EndDate
LEFT JOIN item as i on i.ID = isoc.ItemID
LEFT JOIN saleplan_product_class as spc on i.Code = spc.ItemCodeSAP
LEFT JOIN df_appuser as da on da.ID = iso.SaleEmployeeId
LEFT JOIN df_to_doi_c2 as dtdc2 on dtdc2.Username = da.Username
WHERE iso.generalindirectstateID = 3 and iso.BuyerStoreTypeID in (17,18,23)
""".stripMargin).unionByName(don_hang)
saleout_sp_km_temp.createOrReplaceTempView("saleout_sp_km_temp")
val saleout_sp_km_month = spark.sql(
s"""
SELECT Day,
SaleDepartment, Organization, SPKM_BH1,SPKM_BH2,
sum(Amount) as SaleOut_DT_SP_KM,
sum(Quantity) as SaleOut_SL_SP_KM,
Team_Name
FROM saleout_sp_km_temp
group by Day, SPKM_BH1,SPKM_BH2, SaleDepartment , Organization, Team_Name
""".stripMargin)
val df_saleout_sp_km_kh = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/saleout_spkm.csv")
.filter(col("Ten_chi_tieu") === "SaleOut" and col("Loai_KH") === "Thang")
.withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
.withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
.withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
.groupBy("Month", "Quarter", "Year", "Don_vi", "SPKM_BH1","SPKM_BH2", "Loai_KH","Vung_CN","Ten_doi")
.agg(sum("Doanh_thu").as("Doanh_thu_KH"),sum("So_luong").as("So_luong_KH"))
// Chia kế hoạch theo từng ngày
val my_date = spark.read.format("avro").load("/raw_zone/full_load/date/")
val date_of_month = spark.read.format("avro").load("/raw_zone/full_load/date/").groupBy("month", "quarter", "year").agg(count("*").alias("so_ngay"))
val count_date_of_month = my_date.as("t1").join(date_of_month.as("t2"), my_date("month") === date_of_month("month") && my_date("year") === date_of_month("year"))
.select(col("t1.date"), col("t1.month"), col("t1.quarter"), col("t1.year"), col("t2.so_ngay"))
val df_saleout_sp_km_kh_date = df_saleout_sp_km_kh.join(count_date_of_month, df_saleout_sp_km_kh("Month") === count_date_of_month("Month") and df_saleout_sp_km_kh("Year") === count_date_of_month("Year"), "left")
.withColumn("Doanh_thu_KH", col("Doanh_thu_KH") /col("so_ngay"))
.withColumn("So_luong_KH", col("So_luong_KH") /col("so_ngay"))
val saleout_sp_km = saleout_sp_km_month.join(
df_saleout_sp_km_kh_date,
coalesce(saleout_sp_km_month("Day"), lit("")) === coalesce(df_saleout_sp_km_kh_date("date"), lit("")) &&
coalesce(saleout_sp_km_month("SaleDepartment"), lit("")) === coalesce(df_saleout_sp_km_kh_date("Don_vi"), lit("")) &&
coalesce(saleout_sp_km_month("Organization"), lit("")) === coalesce(df_saleout_sp_km_kh_date("Vung_CN"), lit("")) &&
coalesce(saleout_sp_km_month("Team_Name"), lit("")) === coalesce(df_saleout_sp_km_kh_date("Ten_doi"), lit("")) &&
coalesce(saleout_sp_km_month("SPKM_BH1"), lit("")) === coalesce(df_saleout_sp_km_kh_date("SPKM_BH1"), lit("")) &&
coalesce(saleout_sp_km_month("SPKM_BH2"), lit("")) === coalesce(df_saleout_sp_km_kh_date("SPKM_BH2"), lit("")),
"full"
)
.select(
coalesce(saleout_sp_km_month("Day"), df_saleout_sp_km_kh_date.col("date")).alias("Day"),
coalesce(saleout_sp_km_month("SaleDepartment"), df_saleout_sp_km_kh_date.col("Don_vi")).alias("SaleDepartment"),
coalesce(saleout_sp_km_month("Organization"), df_saleout_sp_km_kh_date.col("Vung_CN")).alias("Organization"),
coalesce(saleout_sp_km_month("SPKM_BH1"), df_saleout_sp_km_kh_date.col("SPKM_BH1")).alias("SPKM_BH1"),
coalesce(saleout_sp_km_month("SPKM_BH2"), df_saleout_sp_km_kh_date.col("SPKM_BH2")).alias("SPKM_BH2"),
saleout_sp_km_month("SaleOut_DT_SP_KM"),
saleout_sp_km_month("SaleOut_SL_SP_KM"),
df_saleout_sp_km_kh_date("Doanh_thu_KH").alias("SaleOut_DT_SP_KM_KH"),
df_saleout_sp_km_kh_date("So_luong_KH").alias("SaleOut_SL_SP_KM_KH"),
coalesce(saleout_sp_km_month("Team_Name"), df_saleout_sp_km_kh_date.col("Ten_doi")).alias("Team_Name")
)
val res = saleout_sp_km
.filter(col("Day").between("2024-01-01", "2024-12-31"))
.select(
lit("SaleOut").alias("Ten_chi_tieu"),
lit(null).alias("Thoi_gian_ke_hoach"),
col("SaleDepartment").alias("Don_vi"),
col("Organization").alias("Vung_CN"),
col("SPKM_BH1").alias("SPKM_BH1"),
col("SPKM_BH2").alias("SPKM_BH2"),
col("Team_Name").alias("Ten_doi"),
lit(null).alias("So_luong"),
lit(null).alias("Doanh_thu")
)
val re = res.distinct()
val reCast = re
.withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType))
.withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType))
.withColumn("Don_vi", col("Don_vi").cast(StringType))
.withColumn("Vung_CN", col("Vung_CN").cast(StringType))
.withColumn("SPKM_BH1", col("SPKM_BH1").cast(StringType))
.withColumn("SPKM_BH2", col("SPKM_BH2").cast(StringType))
.withColumn("Ten_doi", col("Ten_doi").cast(StringType))
.withColumn("So_luong", col("So_luong").cast(IntegerType))
.withColumn("Doanh_thu", col("Doanh_thu").cast(DoubleType))
saleout_sp_km.write.format("avro").mode("overwrite").save(AppConfig.outputDir)
}
}
//code lay bieu mau saleout nsp
package vn.com.viettel.code.dashboard_date
import org.apache.spark.sql.functions._
import vn.com.viettel.AppConfig
import vn.com.viettel.utils.SparkUtils.createBatchSession
object saleout_dt_sp_led_phich {
def main(args: Array[String]): Unit = {
val spark = createBatchSession()
val doi = spark.read.format("csv").option("multiLine", "true").option("header", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/to_doi_saleout.csv")
val appuser = spark.read.table("whs.masterdata_appuser")
val mappingc2 =spark.read.format("csv").option("multiLine", "true").option("header", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/mapping_dms_c2td.csv")
//create temp_product
val product_class = spark.read.table("WHS.SalePlan_ProductClass")
val product_class_detail = spark.read.table("WHS.SalePlan_ProductClassDetail")
val product_class_detail1 = spark.read.table("WHS.SalePlan_ProductClassDetail")
val product_class_detail2 = spark.read.table("WHS.SalePlan_ProductClassDetail")
val indirectsalesorder = spark.read.table("WHS.Order_IndirectSalesOrder")
.withColumn("OrderDate", col("OrderDate") + expr("INTERVAL 7 HOURS"))
//.filter(col("GeneralIndirectStateId").isin(3) && col("BuyerStoreTypeId").isin(17,18,23))
val store = spark.read.table("WHS.MasterData_DMSStore")
val storetype = spark.read.table("WHS.MasterData_StoreType")
val province = spark.read.table("WHS.MasterData_Province")
val indirectsalesordercontent = spark.read.table("WHS.Order_IndirectSalesOrderContent")
val item = spark.read.table("WHS.MasterData_Item")
val product = spark.read.table("WHS.Product_Product")
val temp_product = product_class
.join(product_class_detail, product_class("N_SP1") === product_class_detail("ID"), "left")
.join(product_class_detail1.as("product_class_detail1"), product_class("N_SP2") === product_class_detail1("ID"), "left")
.join(product_class_detail2.as("product_class_detail2"), product_class("N_SP3") === product_class_detail2("ID"), "left")
.select(product_class("ItemName"),
product_class("ItemCodeSAP"),
product_class("N_SP1"),
product_class("N_SP2"),
product_class("N_SP3"),
product_class("SPKM_BH1"),
product_class("SPKM_BH2"),
product_class_detail("Class").alias("Name_NSP1"),
col("product_class_detail1.Class").alias("Name_NSP2"),
col("product_class_detail2.Class").alias("Name_NSP3")
)
val excel = spark.read.format("csv")
.option("header", "true")
.option("multiLine", "true")
// .load("/raw_zone/excel_form/Order.csv")
.load("/raw_zone/excel_form/upload/rangdong_store/c2_*.csv")
.withColumn("Ngay_dat_hang", split(col("Ngày đặt hàng"), " ")(0))
.withColumn(
"Ngay_dat_hang",
when(col("Ngay_dat_hang").contains("/"), to_date(col("Ngay_dat_hang"), "M/d/y"))
.when(col("Ngay_dat_hang").contains("-"), to_date(col("Ngay_dat_hang"), "yyyy-MM-dd")))
.withColumn("OrderDate", from_unixtime(unix_timestamp(col("Ngày đặt hàng"), "yyyy-MM-dd HH:mm:ss")).alias("OrderDate"))
val don_hang = excel
.join(mappingc2, mappingc2("User_Name") === excel("Username"), "left")
.withColumn(("Id_excel"), substring(excel("Username"), -7, 7).cast("bigint"))
.join(store, store("Id") === col("Id_excel"), "left")
.join(temp_product, excel("Barcode") === temp_product("ItemCodeSAP"), "left")
.join(province, store("ProvinceId") === province("Id"), "left")
.filter(excel("OrderDate").between(store("StartDate") + expr("INTERVAL 7 HOURS"), store("EndDate") + expr("INTERVAL 7 HOURS")))
.filter(col("StoreTypeId").isin(17, 18, 23))
.withColumn("Organization",
when(store("Code").startsWith("TV01"), lit("TV01"))
.when(store("Code").startsWith("TV02"), lit("TV02"))
.when(store("Code").startsWith("TV03"), lit("TV03"))
.when(store("Code").startsWith("TN"), lit("TN"))
.when(store("Code").startsWith("NT"), lit("NT"))
.when(store("Code").startsWith("TG"), lit("TG"))
.when(store("Code").startsWith("CT"), lit("CT"))
.when(store("Code").startsWith("DN"), lit("DN"))
.when(store("Code").startsWith("BH"), lit("BH"))
.when(store("Code").startsWith("HCM"), lit("HCM")))
.withColumn("SaleDepartment",
when(store("Code").startsWith("TV01") or store("Code").startsWith("TV02") or store("Code").startsWith("TV03"), lit("P_BH1"))
.when(store("Code").startsWith("TN") or store("Code").startsWith("TG") or store("Code").startsWith("NT") or store("Code").startsWith("CT") or store("Code").startsWith("DN") or store("Code").startsWith("BH") or store("Code").startsWith("HCM"), lit("P_BH2")))
.select(
to_date(excel("Ngay_dat_hang")).alias("Day"),
(col("Số lượng") * col("Đơn Giá") - coalesce(col("Giảm giá"), lit("0"))).alias("Amount"),
col("SaleDepartment"),
col("Organization"),
when(store("StoreTypeId") === 17, lit("Cấp 2"))
.when(store("StoreTypeId") === 18, lit("Cấp 2 Trọng điểm"))
.when(store("StoreTypeId") === 23, lit("Cấp C2 siêu lớn"))
.alias("CustomerType"),
temp_product("N_SP1"),
temp_product("N_SP2"),
temp_product("N_SP3"),
temp_product("Name_NSP1"),
temp_product("Name_NSP2"),
temp_product("Name_NSP3"),
mappingc2("Doi").alias("Team_Name")
)
val Indirectsalesordercontent = indirectsalesordercontent
.join(indirectsalesorder, indirectsalesordercontent("IndirectSalesOrderId") ===indirectsalesorder("Id") , "right")
.join(store, indirectsalesorder("BuyerStoreId") === store("Id") && indirectsalesorder("OrderDate").between( store("StartDate"), store("EndDate")), "left")
.join(province, store("ProvinceId") === province("Id"), "left")
.join(item, indirectsalesordercontent("ItemId") === item("Id"), "left")
.join(product, item("ProductId") === product("Id"), "left")
.join(temp_product, product("Code") === temp_product("ItemCodeSAP"), "left")
.filter(indirectsalesorder("BuyerStoreTypeID").isin(17,18,23) && indirectsalesorder("GeneralIndirectStateId")=== 3)
.join(appuser, indirectsalesorder("SaleEmployeeId") === appuser("Id"), "left")
.join(doi, doi("Username") === appuser("Username"), "left")
.select(
date_format(indirectsalesorder("OrderDate"),"yyyy-MM-dd").alias("Day"),
when(indirectsalesorder("StoreCode").startsWith("TV01") or indirectsalesorder("StoreCode").startsWith("TV02") or indirectsalesorder("StoreCode").startsWith("TV03"), lit("P_BH1")).when(
indirectsalesorder("StoreCode").startsWith("TN") ||
indirectsalesorder("StoreCode").startsWith("TG") ||
indirectsalesorder("StoreCode").startsWith("NT") ||
indirectsalesorder("StoreCode").startsWith("CT") ||
indirectsalesorder("StoreCode").startsWith("DN") ||
indirectsalesorder("StoreCode").startsWith("BH") ||
indirectsalesorder("StoreCode").startsWith("HCM"),
lit("P_BH2")
).alias("SaleDepartment"),
when(store("Code").startsWith("TV01"), lit("TV01"))
.when(store("Code").startsWith("TV02"), lit("TV02"))
.when(store("Code").startsWith("TV03"), lit("TV03"))
.when(store("Code").startsWith("TN"), lit("TN"))
.when(store("Code").startsWith("NT"), lit("NT"))
.when(store("Code").startsWith("TG"), lit("TG"))
.when(store("Code").startsWith("CT"), lit("CT"))
.when(store("Code").startsWith("DN"), lit("DN"))
.when(store("Code").startsWith("BH"), lit("BH"))
.when(store("Code").startsWith("HCM"), lit("HCM"))
.alias("Organization"),
when(indirectsalesorder("BuyerStoreTypeID") === 17, lit("Cấp 2"))
.when(indirectsalesorder("BuyerStoreTypeID") === 18, lit("Cấp 2 Trọng điểm"))
.when(indirectsalesorder("BuyerStoreTypeID") === 23, lit("Cấp C2 siêu lớn"))
.alias("CustomerType"),
temp_product("N_SP1"),
temp_product("N_SP2"),
temp_product("N_SP3"),
temp_product("Name_NSP1"),
temp_product("Name_NSP2"),
temp_product("Name_NSP3"),
doi("Team_Name"),
indirectsalesordercontent("Amount")
).unionByName(don_hang)
val Loai_thang = Indirectsalesordercontent.groupBy("Day", "SaleDepartment", "Organization", "CustomerType","N_SP1","N_SP2","N_SP3",
"Name_NSP1", "Name_NSP2", "Name_NSP3", "Team_Name").agg(sum("Amount").alias("SaleOut_LP_TH"))
val plan = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/saleout_nsp.csv")
.filter(col("Ten_chi_tieu") === "saleout_nsp" and col("Loai_KH") === "Thang")
.withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
.withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
.withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
.withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
val planWithProduct = plan.join(temp_product, plan("Name_NSP1") === temp_product("Name_NSP1")
&& plan("Name_NSP2") === temp_product("Name_NSP2")
&& plan("Name_NSP3") === temp_product("Name_NSP3"), "left")
.select( plan("Month"),
plan("Quarter"),
plan("Year"),
plan("Don_vi"),
plan("Loai_KH"),
plan("Cap_dai_ly"),
plan("Vung_CN"),
plan("Ten_doi"),
plan("Doanh_thu"),
plan("Tinh_thanh_pho"),
temp_product("N_SP1").as("N_SP1"),
temp_product("N_SP2").as("N_SP2"),
temp_product("N_SP3").as("N_SP3"),
plan("Name_NSP1"),
plan("Name_NSP2"),
plan("Name_NSP3")
)
val df_saleout_dt_kh = planWithProduct.filter(col("Loai_KH") === "Thang")
.groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN", "Tinh_thanh_pho", "Cap_dai_ly","N_SP1","N_SP2","N_SP3","Name_NSP1", "Name_NSP2","Name_NSP3" ,"Ten_doi")
.agg(sum("Doanh_thu").as("Doanh_thu_KH"))
// Chia kế hoạch theo từng ngày
val my_date = spark.read.format("avro").load("/raw_zone/full_load/date/")
val date_of_month = spark.read.format("avro").load("/raw_zone/full_load/date/").groupBy("month", "quarter", "year").agg(count("*").alias("so_ngay"))
val count_date_of_month = my_date.as("t1").join(date_of_month.as("t2"), my_date("month") === date_of_month("month") && my_date("year") === date_of_month("year"))
.select(col("t1.date"), col("t1.month"), col("t1.quarter"), col("t1.year"), col("t2.so_ngay"))
val df_saleout_dt_kh_date = df_saleout_dt_kh.join(count_date_of_month, df_saleout_dt_kh("Month") === count_date_of_month("Month") and df_saleout_dt_kh("Year") === count_date_of_month("Year"), "left")
.withColumn("Doanh_thu_KH", col("Doanh_thu_KH") /col("so_ngay"))
val result = Loai_thang.join(
df_saleout_dt_kh_date,
coalesce(Loai_thang("Day"), lit("")) === coalesce(df_saleout_dt_kh_date("date"), lit("")) &&
coalesce(Loai_thang("SaleDepartment"), lit("")) === coalesce(df_saleout_dt_kh_date("Don_vi"), lit("")) &&
coalesce(Loai_thang("Organization"), lit("")) === coalesce(df_saleout_dt_kh_date("Vung_CN"), lit("")) &&
coalesce(Loai_thang("Team_Name"), lit("")) === coalesce(df_saleout_dt_kh_date("Ten_doi"), lit("")) &&
coalesce(Loai_thang("CustomerType"), lit("")) === coalesce(df_saleout_dt_kh_date("Cap_dai_ly"), lit("")) &&
coalesce(Loai_thang("N_SP1"), lit("")) === coalesce(df_saleout_dt_kh_date("N_SP1"), lit("")) &&
coalesce(Loai_thang("N_SP2"), lit("")) === coalesce(df_saleout_dt_kh_date("N_SP2"), lit("")) &&
coalesce(Loai_thang("N_SP3"), lit("")) === coalesce(df_saleout_dt_kh_date("N_SP3"), lit("")) &&
coalesce(Loai_thang("Name_NSP1"), lit("")) === coalesce(df_saleout_dt_kh_date("Name_NSP1"), lit("")) &&
coalesce(Loai_thang("Name_NSP2"), lit("")) === coalesce(df_saleout_dt_kh_date("Name_NSP2"), lit("")) &&
coalesce(Loai_thang("Name_NSP3"), lit("")) === coalesce(df_saleout_dt_kh_date("Name_NSP3"), lit("")),
"full"
).select(
coalesce(Loai_thang("Day"), df_saleout_dt_kh_date.col("date")).alias("Day"),
coalesce(Loai_thang("SaleDepartment"), df_saleout_dt_kh_date.col("Don_vi")).alias("SaleDepartment"),
coalesce(Loai_thang("Organization"), df_saleout_dt_kh_date.col("Vung_CN")).alias("Organization"),
coalesce(Loai_thang("CustomerType"), df_saleout_dt_kh_date.col("Cap_dai_ly")).alias("CustomerType"),
coalesce(Loai_thang("N_SP1"), df_saleout_dt_kh_date.col("N_SP1")).alias("ID_NSP1"),
coalesce(Loai_thang("N_SP2"), df_saleout_dt_kh_date.col("N_SP2")).alias("ID_NSP2"),
coalesce(Loai_thang("N_SP3"), df_saleout_dt_kh_date.col("N_SP3")).alias("ID_NSP3"),
coalesce(Loai_thang("Name_NSP1"), df_saleout_dt_kh_date.col("Name_NSP1")).alias("Name_NSP1"),
coalesce(Loai_thang("Name_NSP2"), df_saleout_dt_kh_date.col("Name_NSP2")).alias("Name_NSP2"),
coalesce(Loai_thang("Name_NSP3"), df_saleout_dt_kh_date.col("Name_NSP3")).alias("Name_NSP3"),
coalesce(Loai_thang("Team_Name"), df_saleout_dt_kh_date.col("Ten_doi")).alias("Team_Name"),
Loai_thang("SaleOut_LP_TH").alias("SaleOut_LP_TH"),
df_saleout_dt_kh_date("Doanh_thu_KH").alias("SaleOut_LP_KH"),
).withColumn("Ma_Nhom", concat(lit("L"), substring(col("ID_NSP1"), -1, 1)))
val filteredResult = result
.filter(col("Day").between("2024-01-01", "2024-12-31"))
.select(
lit("saleout_nsp").alias("Ten_chi_tieu"),
lit(null).alias("Thoi_gian_ke_hoach"),
col("SaleDepartment").alias("Don_vi"),
col("Organization").alias("Vung_CN"),
col("CustomerType").alias("Cap_dai_ly"),
col("Team_Name").alias("Ten_doi"),
col("Name_NSP1").alias("Name_NSP1"),
col("Name_NSP2").alias("Name_NSP2"),
col("Name_NSP3").alias("Name_NSP3"),
lit(null).alias("So_luong"),
lit(null).alias("Doanh_thu")
)
// Apply distinct to ensure unique rows
val re = filteredResult.distinct()
val reCast = re
.withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType))
.withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType))
.withColumn("Don_vi", col("Don_vi").cast(StringType))
.withColumn("Vung_CN", col("Vung_CN").cast(StringType))
.withColumn("Cap_dai_ly", col("Cap_dai_ly").cast(StringType))
.withColumn("Ten_doi", col("Ten_doi").cast(StringType))
.withColumn("Name_NSP1", col("Name_NSP1").cast(StringType))
.withColumn("Name_NSP2", col("Name_NSP2").cast(StringType))
.withColumn("Name_NSP3", col("Name_NSP3").cast(StringType))
.withColumn("So_luong", col("So_luong").cast(IntegerType))
.withColumn("Doanh_thu", col("Doanh_thu").cast(DoubleType))
result.write.format("avro").mode("overwrite").save(AppConfig.outputDir)
}
}
Editor is loading...
Leave a Comment