Untitled
unknown
plain_text
a year ago
18 kB
7
Indexable
val product_class_detail_nsp1 = spark.read.table("WHS.SalePlan_ProductClassDetail")
val product_class_detail_nsp2 = spark.read.table("WHS.SalePlan_ProductClassDetail")
val product_class_detail_nsp3 = spark.read.table("WHS.SalePlan_ProductClassDetail")
// read format ke hoach
spark.read.table("WHS.SalePlan_ProductClassDetail").createOrReplaceTempView("productdetail")
val df_kh_tx = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/salein_tx_thutien_nsp.csv")
.filter(col("Ten_chi_tieu") === "Thực xuất")
.withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
.withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
.withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
.withColumn("ThanhTien", col("Doanh_thu").cast("double"))
// join ke hoach with product class to get product class ID
val kh_tx = df_kh_tx.as("t").join(product_class_detail_nsp1.as("t1"), df_kh_tx("Name_NSP1") === product_class_detail_nsp1("Class"), "left")
.join(product_class_detail_nsp2.as("t2"), df_kh_tx("Name_NSP2") === product_class_detail_nsp2("Class"), "left")
.join(product_class_detail_nsp3.as("t3"), df_kh_tx("Name_NSP3") === product_class_detail_nsp3("Class"), "left")
.select(
col("t.*"),
col("t1.ID").as("SP1"),
col("t2.ID").as("SP2"),
col("t3.ID").as("SP3")
)
// etl tx thang, quy, nam
spark.read.format("avro").load("/gold_zone/full_load/kpi/tx").createOrReplaceTempView("tx")
kh_tx.createOrReplaceTempView("kh_tx")
val output_thang = spark.sql(
"""
select
tx.Month , tx.Quarter, tx.Year , tx.Phong SalesDepartment, tx.Chi_nhanh OrganizationName,tx.Tinh ProvinceName , tx.Kenh SalesChannelName,
tx.N_SP1 , tx.N_SP2 , tx.N_SP3,p1.class Name_NSP1,p2.class Name_NSP2,p3.class Name_NSP3, tx.Team_Name ,tx.ThanhTien ActualExport_Perform , "Thang" Loai
from tx
join productdetail p1 on tx.N_SP1 = p1.ID
join productdetail p2 on tx.N_SP2 = p2.ID
join productdetail p3 on tx.N_SP3 = p3.ID
""").createOrReplaceTempView("output_thang")
val output_quy = spark.sql(
"""
SELECT
NULL AS Month,
tx.Quarter,
tx.Year,
tx.Phong AS SalesDepartment,
tx.Chi_nhanh AS OrganizationName,
tx.Tinh AS ProvinceName,
tx.Kenh AS SalesChannelName,
tx.N_SP1,
tx.N_SP2,
tx.N_SP3,
p1.Class AS Name_NSP1,
p2.Class AS Name_NSP2,
p3.Class AS Name_NSP3,
tx.Team_Name,
SUM(tx.ThanhTien) AS ActualExport_Perform,
'Quy' AS Loai
FROM tx
JOIN productdetail p1 ON tx.N_SP1 = p1.ID
JOIN productdetail p2 ON tx.N_SP2 = p2.ID
JOIN productdetail p3 ON tx.N_SP3 = p3.ID
GROUP BY
tx.Quarter,
tx.Year,
tx.Phong,
tx.Chi_nhanh,
tx.Tinh,
tx.Kenh,
tx.N_SP1,
tx.N_SP2,
tx.N_SP3,
p1.Class,
p2.Class,
p3.Class,
tx.Team_Name
""").createOrReplaceTempView("output_quy")
val output_nam = spark.sql(
"""
SELECT
NULL AS Month,
NULL AS Quarter,
tx.Year,
tx.Phong AS SalesDepartment,
tx.Chi_nhanh AS OrganizationName,
tx.Tinh AS ProvinceName,
tx.Kenh AS SalesChannelName,
tx.N_SP1,
tx.N_SP2,
tx.N_SP3,
p1.Class AS Name_NSP1,
p2.Class AS Name_NSP2,
p3.Class AS Name_NSP3,
tx.Team_Name,
SUM(tx.ThanhTien) AS ActualExport_Perform,
'Nam' AS Loai
FROM tx
JOIN productdetail p1 ON tx.N_SP1 = p1.ID
JOIN productdetail p2 ON tx.N_SP2 = p2.ID
JOIN productdetail p3 ON tx.N_SP3 = p3.ID
GROUP BY
tx.Year,
tx.Phong,
tx.Chi_nhanh,
tx.Tinh,
tx.Kenh,
tx.N_SP1,
tx.N_SP2,
tx.N_SP3,
p1.Class,
p2.Class,
p3.Class,
tx.Team_Name
""").createOrReplaceTempView("output_nam")
//
// spark.read.format("avro").load("/gold_zone/full_load/kpi/tx").createOrReplaceTempView("tx")
// val output = spark.sql(
// """
// select
// Month , Quarter, Year , Phong SalesDepartment, Chi_nhanh OrganizationName,Tinh ProvinceName , Kenh SalesChannelName,
// N_SP1 , N_SP2 , N_SP3, Team_Name ,ThanhTien ActualExport_Perform , "Thang" Loai
// from tx
// union all
// select
// null Month ,Quarter , Year , Phong SalesDepartment, Chi_nhanh OrganizationName,Tinh ProvinceName , Kenh SalesChannelName,
// N_SP1 , N_SP2 , N_SP3, Team_Name ,sum(ThanhTien) ActualExport_Perform , "Quy" Loai
// from tx
// group by Quarter , Year , Phong , Chi_nhanh , Kenh , N_SP1 , N_SP2 , N_SP3,Tinh, Team_Name
// union all
// select
// null Month ,null Quarter , Year , Phong SalesDepartment, Chi_nhanh OrganizationName,Tinh ProvinceName , Kenh SalesChannelName ,
// N_SP1 , N_SP2 , N_SP3, Team_Name ,sum(ThanhTien) ActualExport_Perform , "Nam" Loai
// from tx
// group by Year , Phong , Chi_nhanh , Kenh , N_SP1 , N_SP2 , N_SP3,Tinh, Team_Name
// """).withColumn("ActualExport_Plan", col("ActualExport_Perform") * 1.3)
// .withColumn("SalesChannelName", col("SalesChannelName").cast("string"))
// join ke hoach thang, quy, nam with tx thang, quy, nam
val output = spark.sql(
"""
select output_thang.Month,
output_thang.Quarter,
output_thang.Year,
output_thang.SalesDepartment,
output_thang.OrganizationName,
output_thang.ProvinceName,
output_thang.SalesChannelName,
output_thang.Name_NSP1,
output_thang.Name_NSP2,
output_thang.Name_NSP3,
output_thang.N_SP1,
output_thang.N_SP2,
output_thang.N_SP3,
output_thang.Team_Name,
output_thang.ActualExport_Perform,
output_thang.Loai,
kh_tx.ThanhTien AS ActualExport_Plan
from output_thang
left join
kh_tx
on output_thang.Month = kh_tx.Month and output_thang.Quarter = kh_tx.Quarter and output_thang.Year = kh_tx.Year and output_thang.SalesDepartment = kh_tx.Don_vi and output_thang.OrganizationName = kh_tx.Vung_CN
and output_thang.ProvinceName = kh_tx.Tinh_Thanh_Pho and output_thang.SalesChannelName = kh_tx.Kenh_ban and output_thang.N_SP1 = kh_tx.SP1 and output_thang.N_SP2 = kh_tx.SP2
and output_thang.N_SP3 = kh_tx.SP3 and output_thang.Name_NSP1 = kh_tx.Name_NSP1 and output_thang.Name_NSP2 = kh_tx.Name_NSP2 and output_thang.Name_NSP3 = kh_tx.Name_NSP3 and output_thang.Team_Name = kh_tx.Ten_doi and output_thang.Loai = kh_tx.Loai_KH
union all
select
null Month,
output_quy.Quarter,
output_quy.Year,
output_quy.SalesDepartment,
output_quy.OrganizationName,
output_quy.ProvinceName,
output_quy.SalesChannelName,
output_quy.Name_NSP1,
output_quy.Name_NSP2,
output_quy.Name_NSP3,
output_quy.N_SP1,
output_quy.N_SP2,
output_quy.N_SP3,
output_quy.Team_Name,
output_quy.ActualExport_Perform,
output_quy.Loai,
kh_tx.ThanhTien AS ActualExport_Plan
from output_quy
left join
kh_tx
on output_quy.Quarter = kh_tx.Quarter and output_quy.Year = kh_tx.Year and output_quy.SalesDepartment = kh_tx.Don_vi and output_quy.OrganizationName = kh_tx.Vung_CN
and output_quy.ProvinceName = kh_tx.Tinh_Thanh_Pho and output_quy.SalesChannelName = kh_tx.Kenh_ban and output_quy.N_SP1 = kh_tx.SP1 and output_quy.N_SP2 = kh_tx.SP2
and output_quy.N_SP3 = kh_tx.SP3 and output_quy.Name_NSP1 = kh_tx.Name_NSP1 and output_quy.Name_NSP2 = kh_tx.Name_NSP2 and output_quy.Name_NSP3 = kh_tx.Name_NSP3 and output_quy.Team_Name = kh_tx.Ten_doi and output_quy.Loai = kh_tx.Loai_KH
union all
select
null Month,
null Quarter,
output_nam.Year,
output_nam.SalesDepartment,
output_nam.OrganizationName,
output_nam.ProvinceName,
output_nam.SalesChannelName,
output_nam.Name_NSP1,
output_nam.Name_NSP2,
output_nam.Name_NSP3,
output_nam.N_SP1,
output_nam.N_SP2,
output_nam.N_SP3,
output_nam.Team_Name,
output_nam.ActualExport_Perform,
output_nam.Loai,
kh_tx.ThanhTien AS ActualExport_Plan
from output_nam
left join
kh_tx
on output_nam.Year = kh_tx.Year and output_nam.SalesDepartment = kh_tx.Don_vi and output_nam.OrganizationName = kh_tx.Vung_CN
and output_nam.ProvinceName = kh_tx.Tinh_Thanh_Pho and output_nam.SalesChannelName = kh_tx.Kenh_ban and output_nam.N_SP1 = kh_tx.SP1 and output_nam.N_SP2 = kh_tx.SP2
and output_nam.N_SP3 = kh_tx.SP3 and output_nam.Name_NSP1 = kh_tx.Name_NSP1 and output_nam.Name_NSP2 = kh_tx.Name_NSP2 and output_nam.Name_NSP3 = kh_tx.Name_NSP3 and output_nam.Team_Name = kh_tx.Ten_doi and output_nam.Loai = kh_tx.Loai_KH
""").withColumn("SalesChannelName", col("SalesChannelName").cast("string"))
val ky_truoc_kpi_thuc_xuat = output
.withColumn("Year", when(col("Loai") === "Nam", (col("Year") + 1).cast("int"))
.when((col("Loai") === "Thang" && col("Month") === 12) || (col("Loai") === "Quy" && col("Quarter") === 4), (col("Year") + 1).cast("int"))
.otherwise(col("Year").cast("int")))
.withColumn("Quarter", when(col("Quarter").isNull, col("Quarter").cast("int"))
.when((col("Loai") === "Quy" && col("Quarter") === 4) || (col("Loai") === "Thang" && col("Month") === 12), 1)
.when(col("Loai") === "Quy" && col("Quarter") < 4, (col("Quarter") + 1).cast("int"))
.when(col("Loai") === "Thang" && (col("Month") === 3 || col("Month") === 6 || col("Month") === 9), (col("Quarter") + 1).cast("int"))
.when(col("Loai") === "Thang" && col("Month") === 12, 1).otherwise(col("Quarter").cast("int")))
.withColumn("Month", when(col("Month").isNull, col("Month").cast("int"))
.when(col("Month") < 12, (col("Month") + 1).cast("int"))
.otherwise(1))
val cung_ky_kpi_thucxuat = output.withColumn("Year", (col("Year") + 1).cast("int"))
val res_kpi_thucxuat_cung_ky = output.as("output").
join(cung_ky_kpi_thucxuat.as("cung_ky_kpi_thucxuat"),
col("output.Month") === col("cung_ky_kpi_thucxuat.Month")
&& col("output.Quarter") === col("cung_ky_kpi_thucxuat.Quarter")
&& col("output.Year") === col("cung_ky_kpi_thucxuat.Year")
&& col("output.SalesDepartment") === col("cung_ky_kpi_thucxuat.SalesDepartment")
&& col("output.OrganizationName") === col("cung_ky_kpi_thucxuat.OrganizationName")
&& col("output.ProvinceName") === col("cung_ky_kpi_thucxuat.ProvinceName")
&& col("output.SalesChannelName") === col("cung_ky_kpi_thucxuat.SalesChannelName")
&& col("output.Name_NSP1") === col("cung_ky_kpi_thucxuat.Name_NSP1")
&& col("output.Name_NSP2") === col("cung_ky_kpi_thucxuat.Name_NSP2")
&& col("output.Name_NSP3") === col("cung_ky_kpi_thucxuat.Name_NSP3")
&& col("output.N_SP1") === col("cung_ky_kpi_thucxuat.N_SP1")
&& col("output.N_SP2") === col("cung_ky_kpi_thucxuat.N_SP2")
&& col("output.N_SP3") === col("cung_ky_kpi_thucxuat.N_SP3")
&& col("output.Team_Name") === col("cung_ky_kpi_thucxuat.Team_Name")
&& col("output.Loai") === col("cung_ky_kpi_thucxuat.Loai"),
"fullouter"
)
.select(
coalesce(col("output.Month"), col("cung_ky_kpi_thucxuat.Month")).alias("Month"),
coalesce(col("output.Quarter"), col("cung_ky_kpi_thucxuat.Quarter")).alias("Quarter"),
coalesce(col("output.Year"), col("cung_ky_kpi_thucxuat.Year")).alias("Year"),
coalesce(col("output.SalesDepartment"), col("cung_ky_kpi_thucxuat.SalesDepartment")).alias("SalesDepartment"),
coalesce(col("output.OrganizationName"), col("cung_ky_kpi_thucxuat.OrganizationName")).alias("OrganizationName"),
coalesce(col("output.ProvinceName"), col("cung_ky_kpi_thucxuat.ProvinceName")).alias("ProvinceName"),
coalesce(col("output.SalesChannelName"), col("cung_ky_kpi_thucxuat.SalesChannelName")).alias("SalesChannelName"),
coalesce(col("output.N_SP1"), col("cung_ky_kpi_thucxuat.N_SP1")).alias("N_SP1"),
coalesce(col("output.N_SP2"), col("cung_ky_kpi_thucxuat.N_SP2")).alias("N_SP2"),
coalesce(col("output.N_SP3"), col("cung_ky_kpi_thucxuat.N_SP3")).alias("N_SP3"),
coalesce(col("output.Name_NSP1"), col("cung_ky_kpi_thucxuat.Name_NSP1")).alias("Name_NSP1"),
coalesce(col("output.Name_NSP2"), col("cung_ky_kpi_thucxuat.Name_NSP2")).alias("Name_NSP2"),
coalesce(col("output.Name_NSP3"), col("cung_ky_kpi_thucxuat.Name_NSP3")).alias("Name_NSP3"),
coalesce(col("output.Team_Name"), col("cung_ky_kpi_thucxuat.Team_Name")).alias("Team_Name"),
col("output.ActualExport_Perform").alias("ActualExport_Perform"),
col("output.ActualExport_Plan").alias("ActualExport_Plan"),
coalesce(col("output.Loai"), col("cung_ky_kpi_thucxuat.Loai")).alias("Loai"),
col("cung_ky_kpi_thucxuat.ActualExport_Perform").alias("ActualExport_Perform_CungKy")
)
val res_kpi_thucxuat = res_kpi_thucxuat_cung_ky.as("res_kpi_thucxuat_cung_ky")
.join(ky_truoc_kpi_thuc_xuat.as("ky_truoc_kpi_thuc_xuat"),
col("res_kpi_thucxuat_cung_ky.Month") === col("ky_truoc_kpi_thuc_xuat.Month")
&& col("res_kpi_thucxuat_cung_ky.Quarter") === col("ky_truoc_kpi_thuc_xuat.Quarter")
&& col("res_kpi_thucxuat_cung_ky.Year") === col("ky_truoc_kpi_thuc_xuat.Year")
&& col("res_kpi_thucxuat_cung_ky.SalesDepartment") === col("ky_truoc_kpi_thuc_xuat.SalesDepartment")
&& col("res_kpi_thucxuat_cung_ky.OrganizationName") === col("ky_truoc_kpi_thuc_xuat.OrganizationName")
&& col("res_kpi_thucxuat_cung_ky.ProvinceName") === col("ky_truoc_kpi_thuc_xuat.ProvinceName")
&& col("res_kpi_thucxuat_cung_ky.SalesChannelName") === col("ky_truoc_kpi_thuc_xuat.SalesChannelName")
&& col("res_kpi_thucxuat_cung_ky.Name_NSP1") === col("ky_truoc_kpi_thuc_xuat.Name_NSP1")
&& col("res_kpi_thucxuat_cung_ky.Name_NSP2") === col("ky_truoc_kpi_thuc_xuat.Name_NSP2")
&& col("res_kpi_thucxuat_cung_ky.Name_NSP3") === col("ky_truoc_kpi_thuc_xuat.Name_NSP3")
&& col("res_kpi_thucxuat_cung_ky.N_SP1") === col("ky_truoc_kpi_thuc_xuat.N_SP1")
&& col("res_kpi_thucxuat_cung_ky.N_SP2") === col("ky_truoc_kpi_thuc_xuat.N_SP2")
&& col("res_kpi_thucxuat_cung_ky.N_SP3") === col("ky_truoc_kpi_thuc_xuat.N_SP3")
&& col("res_kpi_thucxuat_cung_ky.Team_Name") === col("ky_truoc_kpi_thuc_xuat.Team_Name")
&& col("res_kpi_thucxuat_cung_ky.Loai") === col("ky_truoc_kpi_thuc_xuat.Loai"),
"fullouter"
)
.select(
coalesce(col("res_kpi_thucxuat_cung_ky.Month"),col("ky_truoc_kpi_thuc_xuat.Month")).alias("Month"),
coalesce(col("res_kpi_thucxuat_cung_ky.Quarter"),col("ky_truoc_kpi_thuc_xuat.Quarter")).alias("Quarter"),
coalesce(col("res_kpi_thucxuat_cung_ky.Year"),col("ky_truoc_kpi_thuc_xuat.Year")).alias("Year"),
coalesce(col("res_kpi_thucxuat_cung_ky.SalesDepartment"),col("ky_truoc_kpi_thuc_xuat.SalesDepartment")).alias("SalesDepartment"),
coalesce(col("res_kpi_thucxuat_cung_ky.OrganizationName"),col("ky_truoc_kpi_thuc_xuat.OrganizationName")).alias("OrganizationName"),
coalesce(col("res_kpi_thucxuat_cung_ky.ProvinceName"),col("ky_truoc_kpi_thuc_xuat.ProvinceName")).alias("ProvinceName"),
coalesce(col("res_kpi_thucxuat_cung_ky.SalesChannelName"),col("ky_truoc_kpi_thuc_xuat.SalesChannelName")).alias("SalesChannelName"),
coalesce(col("res_kpi_thucxuat_cung_ky.N_SP1"),col("ky_truoc_kpi_thuc_xuat.N_SP1")).alias("N_SP1"),
coalesce(col("res_kpi_thucxuat_cung_ky.N_SP2"),col("ky_truoc_kpi_thuc_xuat.N_SP2")).alias("N_SP2"),
coalesce(col("res_kpi_thucxuat_cung_ky.N_SP3"),col("ky_truoc_kpi_thuc_xuat.N_SP3")).alias("N_SP3"),
coalesce(col("res_kpi_thucxuat_cung_ky.Name_NSP1"),col("ky_truoc_kpi_thuc_xuat.Name_NSP1")).alias("Name_NSP1"),
coalesce(col("res_kpi_thucxuat_cung_ky.Name_NSP2"),col("ky_truoc_kpi_thuc_xuat.Name_NSP2")).alias("Name_NSP2"),
coalesce(col("res_kpi_thucxuat_cung_ky.Name_NSP3"),col("ky_truoc_kpi_thuc_xuat.Name_NSP3")).alias("Name_NSP3"),
coalesce(col("res_kpi_thucxuat_cung_ky.Team_Name"),col("ky_truoc_kpi_thuc_xuat.Team_Name")).alias("Team_Name"),
col("res_kpi_thucxuat_cung_ky.ActualExport_Perform").alias("ActualExport_Perform"),
col("res_kpi_thucxuat_cung_ky.ActualExport_Plan").alias("ActualExport_Plan"),
coalesce(col("res_kpi_thucxuat_cung_ky.Loai"),col("ky_truoc_kpi_thuc_xuat.Loai")).alias("Loai"),
col("res_kpi_thucxuat_cung_ky.ActualExport_Perform_CungKy").alias("ActualExport_Perform_CungKy"),
col("ky_truoc_kpi_thuc_xuat.ActualExport_Perform").alias("ActualExport_Perform_KyTruoc")
).withColumn("Ma_Nhom", concat(lit("L"), substring(col("N_SP1"), -1, 1)))
res_kpi_thucxuat.createOrReplaceTempView("kpi_thucxuat")
val res = spark.sql(
"""
select N_SP1, sum(ActualExport_Perform_KyTruoc)
from kpi_thucxuat where Year = 2024 and Month between 1 and 5
group by N_SP1
""")Editor is loading...
Leave a Comment