Untitled
unknown
plain_text
9 months ago
18 kB
3
Indexable
val product_class_detail_nsp1 = spark.read.table("WHS.SalePlan_ProductClassDetail") val product_class_detail_nsp2 = spark.read.table("WHS.SalePlan_ProductClassDetail") val product_class_detail_nsp3 = spark.read.table("WHS.SalePlan_ProductClassDetail") // read format ke hoach spark.read.table("WHS.SalePlan_ProductClassDetail").createOrReplaceTempView("productdetail") val df_kh_tx = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/salein_tx_thutien_nsp.csv") .filter(col("Ten_chi_tieu") === "Thực xuất") .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M")) .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y")) .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q")) .withColumn("ThanhTien", col("Doanh_thu").cast("double")) // join ke hoach with product class to get product class ID val kh_tx = df_kh_tx.as("t").join(product_class_detail_nsp1.as("t1"), df_kh_tx("Name_NSP1") === product_class_detail_nsp1("Class"), "left") .join(product_class_detail_nsp2.as("t2"), df_kh_tx("Name_NSP2") === product_class_detail_nsp2("Class"), "left") .join(product_class_detail_nsp3.as("t3"), df_kh_tx("Name_NSP3") === product_class_detail_nsp3("Class"), "left") .select( col("t.*"), col("t1.ID").as("SP1"), col("t2.ID").as("SP2"), col("t3.ID").as("SP3") ) // etl tx thang, quy, nam spark.read.format("avro").load("/gold_zone/full_load/kpi/tx").createOrReplaceTempView("tx") kh_tx.createOrReplaceTempView("kh_tx") val output_thang = spark.sql( """ select tx.Month , tx.Quarter, tx.Year , tx.Phong SalesDepartment, tx.Chi_nhanh OrganizationName,tx.Tinh ProvinceName , tx.Kenh SalesChannelName, tx.N_SP1 , tx.N_SP2 , tx.N_SP3,p1.class Name_NSP1,p2.class Name_NSP2,p3.class Name_NSP3, tx.Team_Name ,tx.ThanhTien ActualExport_Perform , "Thang" Loai from tx join productdetail p1 on tx.N_SP1 = p1.ID join productdetail p2 on tx.N_SP2 = p2.ID join productdetail p3 on tx.N_SP3 = p3.ID """).createOrReplaceTempView("output_thang") val output_quy = spark.sql( """ SELECT NULL AS Month, tx.Quarter, tx.Year, tx.Phong AS SalesDepartment, tx.Chi_nhanh AS OrganizationName, tx.Tinh AS ProvinceName, tx.Kenh AS SalesChannelName, tx.N_SP1, tx.N_SP2, tx.N_SP3, p1.Class AS Name_NSP1, p2.Class AS Name_NSP2, p3.Class AS Name_NSP3, tx.Team_Name, SUM(tx.ThanhTien) AS ActualExport_Perform, 'Quy' AS Loai FROM tx JOIN productdetail p1 ON tx.N_SP1 = p1.ID JOIN productdetail p2 ON tx.N_SP2 = p2.ID JOIN productdetail p3 ON tx.N_SP3 = p3.ID GROUP BY tx.Quarter, tx.Year, tx.Phong, tx.Chi_nhanh, tx.Tinh, tx.Kenh, tx.N_SP1, tx.N_SP2, tx.N_SP3, p1.Class, p2.Class, p3.Class, tx.Team_Name """).createOrReplaceTempView("output_quy") val output_nam = spark.sql( """ SELECT NULL AS Month, NULL AS Quarter, tx.Year, tx.Phong AS SalesDepartment, tx.Chi_nhanh AS OrganizationName, tx.Tinh AS ProvinceName, tx.Kenh AS SalesChannelName, tx.N_SP1, tx.N_SP2, tx.N_SP3, p1.Class AS Name_NSP1, p2.Class AS Name_NSP2, p3.Class AS Name_NSP3, tx.Team_Name, SUM(tx.ThanhTien) AS ActualExport_Perform, 'Nam' AS Loai FROM tx JOIN productdetail p1 ON tx.N_SP1 = p1.ID JOIN productdetail p2 ON tx.N_SP2 = p2.ID JOIN productdetail p3 ON tx.N_SP3 = p3.ID GROUP BY tx.Year, tx.Phong, tx.Chi_nhanh, tx.Tinh, tx.Kenh, tx.N_SP1, tx.N_SP2, tx.N_SP3, p1.Class, p2.Class, p3.Class, tx.Team_Name """).createOrReplaceTempView("output_nam") // // spark.read.format("avro").load("/gold_zone/full_load/kpi/tx").createOrReplaceTempView("tx") // val output = spark.sql( // """ // select // Month , Quarter, Year , Phong SalesDepartment, Chi_nhanh OrganizationName,Tinh ProvinceName , Kenh SalesChannelName, // N_SP1 , N_SP2 , N_SP3, Team_Name ,ThanhTien ActualExport_Perform , "Thang" Loai // from tx // union all // select // null Month ,Quarter , Year , Phong SalesDepartment, Chi_nhanh OrganizationName,Tinh ProvinceName , Kenh SalesChannelName, // N_SP1 , N_SP2 , N_SP3, Team_Name ,sum(ThanhTien) ActualExport_Perform , "Quy" Loai // from tx // group by Quarter , Year , Phong , Chi_nhanh , Kenh , N_SP1 , N_SP2 , N_SP3,Tinh, Team_Name // union all // select // null Month ,null Quarter , Year , Phong SalesDepartment, Chi_nhanh OrganizationName,Tinh ProvinceName , Kenh SalesChannelName , // N_SP1 , N_SP2 , N_SP3, Team_Name ,sum(ThanhTien) ActualExport_Perform , "Nam" Loai // from tx // group by Year , Phong , Chi_nhanh , Kenh , N_SP1 , N_SP2 , N_SP3,Tinh, Team_Name // """).withColumn("ActualExport_Plan", col("ActualExport_Perform") * 1.3) // .withColumn("SalesChannelName", col("SalesChannelName").cast("string")) // join ke hoach thang, quy, nam with tx thang, quy, nam val output = spark.sql( """ select output_thang.Month, output_thang.Quarter, output_thang.Year, output_thang.SalesDepartment, output_thang.OrganizationName, output_thang.ProvinceName, output_thang.SalesChannelName, output_thang.Name_NSP1, output_thang.Name_NSP2, output_thang.Name_NSP3, output_thang.N_SP1, output_thang.N_SP2, output_thang.N_SP3, output_thang.Team_Name, output_thang.ActualExport_Perform, output_thang.Loai, kh_tx.ThanhTien AS ActualExport_Plan from output_thang left join kh_tx on output_thang.Month = kh_tx.Month and output_thang.Quarter = kh_tx.Quarter and output_thang.Year = kh_tx.Year and output_thang.SalesDepartment = kh_tx.Don_vi and output_thang.OrganizationName = kh_tx.Vung_CN and output_thang.ProvinceName = kh_tx.Tinh_Thanh_Pho and output_thang.SalesChannelName = kh_tx.Kenh_ban and output_thang.N_SP1 = kh_tx.SP1 and output_thang.N_SP2 = kh_tx.SP2 and output_thang.N_SP3 = kh_tx.SP3 and output_thang.Name_NSP1 = kh_tx.Name_NSP1 and output_thang.Name_NSP2 = kh_tx.Name_NSP2 and output_thang.Name_NSP3 = kh_tx.Name_NSP3 and output_thang.Team_Name = kh_tx.Ten_doi and output_thang.Loai = kh_tx.Loai_KH union all select null Month, output_quy.Quarter, output_quy.Year, output_quy.SalesDepartment, output_quy.OrganizationName, output_quy.ProvinceName, output_quy.SalesChannelName, output_quy.Name_NSP1, output_quy.Name_NSP2, output_quy.Name_NSP3, output_quy.N_SP1, output_quy.N_SP2, output_quy.N_SP3, output_quy.Team_Name, output_quy.ActualExport_Perform, output_quy.Loai, kh_tx.ThanhTien AS ActualExport_Plan from output_quy left join kh_tx on output_quy.Quarter = kh_tx.Quarter and output_quy.Year = kh_tx.Year and output_quy.SalesDepartment = kh_tx.Don_vi and output_quy.OrganizationName = kh_tx.Vung_CN and output_quy.ProvinceName = kh_tx.Tinh_Thanh_Pho and output_quy.SalesChannelName = kh_tx.Kenh_ban and output_quy.N_SP1 = kh_tx.SP1 and output_quy.N_SP2 = kh_tx.SP2 and output_quy.N_SP3 = kh_tx.SP3 and output_quy.Name_NSP1 = kh_tx.Name_NSP1 and output_quy.Name_NSP2 = kh_tx.Name_NSP2 and output_quy.Name_NSP3 = kh_tx.Name_NSP3 and output_quy.Team_Name = kh_tx.Ten_doi and output_quy.Loai = kh_tx.Loai_KH union all select null Month, null Quarter, output_nam.Year, output_nam.SalesDepartment, output_nam.OrganizationName, output_nam.ProvinceName, output_nam.SalesChannelName, output_nam.Name_NSP1, output_nam.Name_NSP2, output_nam.Name_NSP3, output_nam.N_SP1, output_nam.N_SP2, output_nam.N_SP3, output_nam.Team_Name, output_nam.ActualExport_Perform, output_nam.Loai, kh_tx.ThanhTien AS ActualExport_Plan from output_nam left join kh_tx on output_nam.Year = kh_tx.Year and output_nam.SalesDepartment = kh_tx.Don_vi and output_nam.OrganizationName = kh_tx.Vung_CN and output_nam.ProvinceName = kh_tx.Tinh_Thanh_Pho and output_nam.SalesChannelName = kh_tx.Kenh_ban and output_nam.N_SP1 = kh_tx.SP1 and output_nam.N_SP2 = kh_tx.SP2 and output_nam.N_SP3 = kh_tx.SP3 and output_nam.Name_NSP1 = kh_tx.Name_NSP1 and output_nam.Name_NSP2 = kh_tx.Name_NSP2 and output_nam.Name_NSP3 = kh_tx.Name_NSP3 and output_nam.Team_Name = kh_tx.Ten_doi and output_nam.Loai = kh_tx.Loai_KH """).withColumn("SalesChannelName", col("SalesChannelName").cast("string")) val ky_truoc_kpi_thuc_xuat = output .withColumn("Year", when(col("Loai") === "Nam", (col("Year") + 1).cast("int")) .when((col("Loai") === "Thang" && col("Month") === 12) || (col("Loai") === "Quy" && col("Quarter") === 4), (col("Year") + 1).cast("int")) .otherwise(col("Year").cast("int"))) .withColumn("Quarter", when(col("Quarter").isNull, col("Quarter").cast("int")) .when((col("Loai") === "Quy" && col("Quarter") === 4) || (col("Loai") === "Thang" && col("Month") === 12), 1) .when(col("Loai") === "Quy" && col("Quarter") < 4, (col("Quarter") + 1).cast("int")) .when(col("Loai") === "Thang" && (col("Month") === 3 || col("Month") === 6 || col("Month") === 9), (col("Quarter") + 1).cast("int")) .when(col("Loai") === "Thang" && col("Month") === 12, 1).otherwise(col("Quarter").cast("int"))) .withColumn("Month", when(col("Month").isNull, col("Month").cast("int")) .when(col("Month") < 12, (col("Month") + 1).cast("int")) .otherwise(1)) val cung_ky_kpi_thucxuat = output.withColumn("Year", (col("Year") + 1).cast("int")) val res_kpi_thucxuat_cung_ky = output.as("output"). join(cung_ky_kpi_thucxuat.as("cung_ky_kpi_thucxuat"), col("output.Month") === col("cung_ky_kpi_thucxuat.Month") && col("output.Quarter") === col("cung_ky_kpi_thucxuat.Quarter") && col("output.Year") === col("cung_ky_kpi_thucxuat.Year") && col("output.SalesDepartment") === col("cung_ky_kpi_thucxuat.SalesDepartment") && col("output.OrganizationName") === col("cung_ky_kpi_thucxuat.OrganizationName") && col("output.ProvinceName") === col("cung_ky_kpi_thucxuat.ProvinceName") && col("output.SalesChannelName") === col("cung_ky_kpi_thucxuat.SalesChannelName") && col("output.Name_NSP1") === col("cung_ky_kpi_thucxuat.Name_NSP1") && col("output.Name_NSP2") === col("cung_ky_kpi_thucxuat.Name_NSP2") && col("output.Name_NSP3") === col("cung_ky_kpi_thucxuat.Name_NSP3") && col("output.N_SP1") === col("cung_ky_kpi_thucxuat.N_SP1") && col("output.N_SP2") === col("cung_ky_kpi_thucxuat.N_SP2") && col("output.N_SP3") === col("cung_ky_kpi_thucxuat.N_SP3") && col("output.Team_Name") === col("cung_ky_kpi_thucxuat.Team_Name") && col("output.Loai") === col("cung_ky_kpi_thucxuat.Loai"), "fullouter" ) .select( coalesce(col("output.Month"), col("cung_ky_kpi_thucxuat.Month")).alias("Month"), coalesce(col("output.Quarter"), col("cung_ky_kpi_thucxuat.Quarter")).alias("Quarter"), coalesce(col("output.Year"), col("cung_ky_kpi_thucxuat.Year")).alias("Year"), coalesce(col("output.SalesDepartment"), col("cung_ky_kpi_thucxuat.SalesDepartment")).alias("SalesDepartment"), coalesce(col("output.OrganizationName"), col("cung_ky_kpi_thucxuat.OrganizationName")).alias("OrganizationName"), coalesce(col("output.ProvinceName"), col("cung_ky_kpi_thucxuat.ProvinceName")).alias("ProvinceName"), coalesce(col("output.SalesChannelName"), col("cung_ky_kpi_thucxuat.SalesChannelName")).alias("SalesChannelName"), coalesce(col("output.N_SP1"), col("cung_ky_kpi_thucxuat.N_SP1")).alias("N_SP1"), coalesce(col("output.N_SP2"), col("cung_ky_kpi_thucxuat.N_SP2")).alias("N_SP2"), coalesce(col("output.N_SP3"), col("cung_ky_kpi_thucxuat.N_SP3")).alias("N_SP3"), coalesce(col("output.Name_NSP1"), col("cung_ky_kpi_thucxuat.Name_NSP1")).alias("Name_NSP1"), coalesce(col("output.Name_NSP2"), col("cung_ky_kpi_thucxuat.Name_NSP2")).alias("Name_NSP2"), coalesce(col("output.Name_NSP3"), col("cung_ky_kpi_thucxuat.Name_NSP3")).alias("Name_NSP3"), coalesce(col("output.Team_Name"), col("cung_ky_kpi_thucxuat.Team_Name")).alias("Team_Name"), col("output.ActualExport_Perform").alias("ActualExport_Perform"), col("output.ActualExport_Plan").alias("ActualExport_Plan"), coalesce(col("output.Loai"), col("cung_ky_kpi_thucxuat.Loai")).alias("Loai"), col("cung_ky_kpi_thucxuat.ActualExport_Perform").alias("ActualExport_Perform_CungKy") ) val res_kpi_thucxuat = res_kpi_thucxuat_cung_ky.as("res_kpi_thucxuat_cung_ky") .join(ky_truoc_kpi_thuc_xuat.as("ky_truoc_kpi_thuc_xuat"), col("res_kpi_thucxuat_cung_ky.Month") === col("ky_truoc_kpi_thuc_xuat.Month") && col("res_kpi_thucxuat_cung_ky.Quarter") === col("ky_truoc_kpi_thuc_xuat.Quarter") && col("res_kpi_thucxuat_cung_ky.Year") === col("ky_truoc_kpi_thuc_xuat.Year") && col("res_kpi_thucxuat_cung_ky.SalesDepartment") === col("ky_truoc_kpi_thuc_xuat.SalesDepartment") && col("res_kpi_thucxuat_cung_ky.OrganizationName") === col("ky_truoc_kpi_thuc_xuat.OrganizationName") && col("res_kpi_thucxuat_cung_ky.ProvinceName") === col("ky_truoc_kpi_thuc_xuat.ProvinceName") && col("res_kpi_thucxuat_cung_ky.SalesChannelName") === col("ky_truoc_kpi_thuc_xuat.SalesChannelName") && col("res_kpi_thucxuat_cung_ky.Name_NSP1") === col("ky_truoc_kpi_thuc_xuat.Name_NSP1") && col("res_kpi_thucxuat_cung_ky.Name_NSP2") === col("ky_truoc_kpi_thuc_xuat.Name_NSP2") && col("res_kpi_thucxuat_cung_ky.Name_NSP3") === col("ky_truoc_kpi_thuc_xuat.Name_NSP3") && col("res_kpi_thucxuat_cung_ky.N_SP1") === col("ky_truoc_kpi_thuc_xuat.N_SP1") && col("res_kpi_thucxuat_cung_ky.N_SP2") === col("ky_truoc_kpi_thuc_xuat.N_SP2") && col("res_kpi_thucxuat_cung_ky.N_SP3") === col("ky_truoc_kpi_thuc_xuat.N_SP3") && col("res_kpi_thucxuat_cung_ky.Team_Name") === col("ky_truoc_kpi_thuc_xuat.Team_Name") && col("res_kpi_thucxuat_cung_ky.Loai") === col("ky_truoc_kpi_thuc_xuat.Loai"), "fullouter" ) .select( coalesce(col("res_kpi_thucxuat_cung_ky.Month"),col("ky_truoc_kpi_thuc_xuat.Month")).alias("Month"), coalesce(col("res_kpi_thucxuat_cung_ky.Quarter"),col("ky_truoc_kpi_thuc_xuat.Quarter")).alias("Quarter"), coalesce(col("res_kpi_thucxuat_cung_ky.Year"),col("ky_truoc_kpi_thuc_xuat.Year")).alias("Year"), coalesce(col("res_kpi_thucxuat_cung_ky.SalesDepartment"),col("ky_truoc_kpi_thuc_xuat.SalesDepartment")).alias("SalesDepartment"), coalesce(col("res_kpi_thucxuat_cung_ky.OrganizationName"),col("ky_truoc_kpi_thuc_xuat.OrganizationName")).alias("OrganizationName"), coalesce(col("res_kpi_thucxuat_cung_ky.ProvinceName"),col("ky_truoc_kpi_thuc_xuat.ProvinceName")).alias("ProvinceName"), coalesce(col("res_kpi_thucxuat_cung_ky.SalesChannelName"),col("ky_truoc_kpi_thuc_xuat.SalesChannelName")).alias("SalesChannelName"), coalesce(col("res_kpi_thucxuat_cung_ky.N_SP1"),col("ky_truoc_kpi_thuc_xuat.N_SP1")).alias("N_SP1"), coalesce(col("res_kpi_thucxuat_cung_ky.N_SP2"),col("ky_truoc_kpi_thuc_xuat.N_SP2")).alias("N_SP2"), coalesce(col("res_kpi_thucxuat_cung_ky.N_SP3"),col("ky_truoc_kpi_thuc_xuat.N_SP3")).alias("N_SP3"), coalesce(col("res_kpi_thucxuat_cung_ky.Name_NSP1"),col("ky_truoc_kpi_thuc_xuat.Name_NSP1")).alias("Name_NSP1"), coalesce(col("res_kpi_thucxuat_cung_ky.Name_NSP2"),col("ky_truoc_kpi_thuc_xuat.Name_NSP2")).alias("Name_NSP2"), coalesce(col("res_kpi_thucxuat_cung_ky.Name_NSP3"),col("ky_truoc_kpi_thuc_xuat.Name_NSP3")).alias("Name_NSP3"), coalesce(col("res_kpi_thucxuat_cung_ky.Team_Name"),col("ky_truoc_kpi_thuc_xuat.Team_Name")).alias("Team_Name"), col("res_kpi_thucxuat_cung_ky.ActualExport_Perform").alias("ActualExport_Perform"), col("res_kpi_thucxuat_cung_ky.ActualExport_Plan").alias("ActualExport_Plan"), coalesce(col("res_kpi_thucxuat_cung_ky.Loai"),col("ky_truoc_kpi_thuc_xuat.Loai")).alias("Loai"), col("res_kpi_thucxuat_cung_ky.ActualExport_Perform_CungKy").alias("ActualExport_Perform_CungKy"), col("ky_truoc_kpi_thuc_xuat.ActualExport_Perform").alias("ActualExport_Perform_KyTruoc") ).withColumn("Ma_Nhom", concat(lit("L"), substring(col("N_SP1"), -1, 1))) res_kpi_thucxuat.createOrReplaceTempView("kpi_thucxuat") val res = spark.sql( """ select N_SP1, sum(ActualExport_Perform_KyTruoc) from kpi_thucxuat where Year = 2024 and Month between 1 and 5 group by N_SP1 """)
Editor is loading...
Leave a Comment