Untitled
unknown
plain_text
a year ago
39 kB
6
Indexable
/// code lay bieu mau saleout mt package vn.com.viettel.code.dashboard_date import org.apache.spark.sql.SparkSession import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import vn.com.viettel.AppConfig.outputDir import vn.com.viettel.utils.SparkUtils.createBatchSession object saleout_sp_mt { def main(argv: Array[String]): Unit = { val spark: SparkSession = createBatchSession() // Đọc kế hoạch val df_kh_saleout_spmt = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/saleout_spmt.csv").filter(col("Ten_chi_tieu") === "SaleOut") .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M")) .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q")) .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y")) val df_kh_saleout_thang = df_kh_saleout_spmt.filter(col("Loai_KH") === "Thang").groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN", "Name_SPMT", "Ten_doi") .agg(sum("So_luong").as("SaleOut_SL_SP_MT_KH"), sum("Doanh_thu").as("SaleOut_DT_SP_MT_KH")) // generate data from 2010 and 2050 val my_date = spark.read.format("avro").load("/raw_zone/full_load/date/") val date_of_month = spark.read.format("avro").load("/raw_zone/full_load/date/").groupBy("month", "quarter", "year").agg(count("*").alias("so_ngay")) val count_date_of_month = my_date.as("t1").join(date_of_month.as("t2"), my_date("month") === date_of_month("month") && my_date("year") === date_of_month("year")) .select(col("t1.date"), col("t1.month"), col("t1.quarter"), col("t1.year"), col("t2.so_ngay")) // Gia_tri_ke_hoach and So_luong divide so_ngay val kh_saleout_spmt_date = df_kh_saleout_thang.join(count_date_of_month, df_kh_saleout_thang("Month") === count_date_of_month("Month") and df_kh_saleout_thang("Year") === count_date_of_month("Year"), "left") .withColumn("SaleOut_SL_SP_MT_KH", col("SaleOut_SL_SP_MT_KH") / col("so_ngay")) .withColumn("SaleOut_DT_SP_MT_KH", col("SaleOut_DT_SP_MT_KH") / col("so_ngay")) // Tổng hợp thực hiện val df_appuser = spark.read.table("whs.masterdata_appuser") val df_to_doi_c2 = spark.read.format("csv").option("multiLine", "true").option("header", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/to_doi_saleout.csv") val listBuyerStoreTypeID = Seq(17, 18, 23) val iso = spark.read.table("WHS.Order_IndirectSalesOrder") .withColumn("OrderDate", col("OrderDate") + expr("INTERVAL 7 HOURS")) .filter(col("BuyerStoreTypeId").isin(listBuyerStoreTypeID: _*)) .filter(col("GeneralIndirectStateId") === 3) val isoc = spark.read.table("WHS.Order_IndirectSalesOrderContent") val store = spark.read.table("WHS.MasterData_DMSStore") .withColumn("StartDate", col("StartDate") + expr("INTERVAL 7 HOURS")) .withColumn("EndDate", col("EndDate") + expr("INTERVAL 7 HOURS")) .where( col("Code").like("TV01%") || col("Code").like("TV02%") || col("Code").like("TV03%") || col("Code").like("TN%") || col("Code").like("TG%") || col("Code").like("NT%") || col("Code").like("CT%") || col("Code").like("DN%") || col("Code").like("BH%") || col("Code").like("HCM%") ) val mappingc2 =spark.read.format("csv").option("multiLine", "true").option("header", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/mapping_dms_c2td.csv") val item = spark.read.table("WHS.MasterData_Item").filter(col("DeletedAt").isNull) val ppgm = spark.read.table("WHS.MasterData_ProductProductGroupingMapping") val windowspec = Window.partitionBy("Name").orderBy(desc("UpdatedAt")) val productgrouping = spark.read.table("WHS.MasterData_ProductGrouping").filter(col("DeletedAt").isNull).withColumn("rn", row_number().over(windowspec)).filter(col("rn") === 1) // sản phẩm mục tiêu val mapping = item.as("i").join( ppgm.as("ppgm"), col("i.ProductId") === col("ppgm.ProductId"), "left" ).join( productgrouping.as("pg"), col("ppgm.ProductGroupingId") === col("pg.Id"), "left" ).select( col("i.Code").as("ItemCode"), col("i.Id").as("ItemId"), col("pg.Code"), col("pg.Name"), col("pg.UpdatedAt") ).distinct // Đơn hàng excel val excel = spark.read.format("csv") .option("header", "true") .option("multiLine", "true") .load("/raw_zone/excel_form/upload/rangdong_store/c2_*.csv") .withColumn("Ngay_dat_hang", split(col("Ngày đặt hàng"), " ")(0)) .withColumn( "Ngay_dat_hang", when(col("Ngay_dat_hang").contains("/"), to_date(col("Ngay_dat_hang"), "M/d/y")) .when(col("Ngay_dat_hang").contains("-"), to_date(col("Ngay_dat_hang"), "yyyy-MM-dd"))) .withColumn("OrderDate", from_unixtime(unix_timestamp(col("Ngày đặt hàng"), "yyyy-MM-dd HH:mm:ss")).alias("OrderDate")) val don_hang = excel .join(mappingc2, mappingc2("User_Name") === excel("Username"), "left") .withColumn(("Id_excel"), substring(excel("Username"), -7, 7).cast("bigint")) .join(store, store("Id") === col("Id_excel"), "left") .join(item, excel("BarCode") === item("Code"), "left") .join( mapping, item("Id") === mapping("ItemId"), "left" ) .filter(excel("OrderDate").between(store("StartDate") + expr("INTERVAL 7 HOURS"), store("EndDate") + expr("INTERVAL 7 HOURS"))) .filter(col("StoreTypeId").isin(17, 18, 23)) .withColumn("Organization", when(store("Code").startsWith("TV01"), lit("TV01")) .when(store("Code").startsWith("TV02"), lit("TV02")) .when(store("Code").startsWith("TV03"), lit("TV03")) .when(store("Code").startsWith("TN"), lit("TN")) .when(store("Code").startsWith("NT"), lit("NT")) .when(store("Code").startsWith("TG"), lit("TG")) .when(store("Code").startsWith("CT"), lit("CT")) .when(store("Code").startsWith("DN"), lit("DN")) .when(store("Code").startsWith("BH"), lit("BH")) .when(store("Code").startsWith("HCM"), lit("HCM"))) .withColumn("SaleDepartment", when(store("Code").startsWith("TV01") or store("Code").startsWith("TV02") or store("Code").startsWith("TV03"), lit("P_BH1")) .when(store("Code").startsWith("TN") or store("Code").startsWith("TG") or store("Code").startsWith("NT") or store("Code").startsWith("CT") or store("Code").startsWith("DN") or store("Code").startsWith("BH") or store("Code").startsWith("HCM"), lit("P_BH2"))) .select( to_date(excel("Ngay_dat_hang")).alias("Day"), col("SaleDepartment"), col("Organization"), mapping("Name").alias("ProductGroupName"), col("Thành tiền 2").alias("SaleOut_DT_SP_MT"), col("Số lượng").alias("SaleOut_SL_SP_MT"), mappingc2("Doi").alias("Team_Name") ) // Đơn hàng DMS val th = iso.as("t0") .join(df_appuser, df_appuser("Id") === col("t0.SaleEmployeeId"), "left") .join(df_to_doi_c2, df_appuser("Username") === df_to_doi_c2("Username"), "left") .join( store.as("t1"), col("t0.BuyerStoreId") === col("t1.Id") && col("t0.OrderDate").between(col("t1.StartDate"),col("t1.EndDate")), "left" ).join( isoc.as("t2"), col("t0.Id") === col("t2.IndirectSalesOrderId"), "left" ).join( mapping.as("m"), col("t2.ItemId") === col("m.ItemId"), "left" ) .select( date_format(col("t0.OrderDate"), "yyyy-MM-dd").alias("Day"), when(col("t0.StoreCode").like("TV01%") || col("t0.StoreCode").like("TV02%") || col("t0.StoreCode").like("TV03%"), "P_BH1") .when(col("t0.StoreCode").like("TN%") || col("t0.StoreCode").like("TG%") || col("t0.StoreCode").like("NT%") || col("t0.StoreCode").like("CT%") || col("t0.StoreCode").like("DN%") || col("t0.StoreCode").like("BH%") || col("t0.StoreCode").like("HCM%"), "P_BH2") .as("SaleDepartment"), when(col("t1.Code").like("TV01%"), "TV01") .when(col("t1.Code").like("TV02%"), "TV02") .when(col("t1.Code").like("TV03%"), "TV03") .when(col("t1.Code").like("TN%"), "TN") .when(col("t1.Code").like("TG%"), "TG") .when(col("t1.Code").like("NT%"), "NT") .when(col("t1.Code").like("CT%"), "CT") .when(col("t1.Code").like("DN%"), "DN") .when(col("t1.Code").like("BH%"), "BH") .when(col("t1.Code").like("HCM%"), "HCM") .as("Organization"), col("t2.Amount").as("SaleOut_DT_SP_MT"), col("t2.Quantity").as("SaleOut_SL_SP_MT"), col("m.Name").as("ProductGroupName"), df_to_doi_c2("Team_Name") ).unionByName(don_hang) .groupBy( col("Day"), col("SaleDepartment"), col("Organization"), col("ProductGroupName"), col("Team_Name") ).agg( sum("SaleOut_DT_SP_MT").as("SaleOut_DT_SP_MT"), sum("SaleOut_SL_SP_MT").as("SaleOut_SL_SP_MT") ) val th_thang = th.select( "Day", "SaleDepartment", "Organization", "ProductGroupName", "SaleOut_DT_SP_MT", "SaleOut_SL_SP_MT", "Team_Name" ) val thang = th_thang.join(kh_saleout_spmt_date, th_thang("Day") === kh_saleout_spmt_date("date") and th_thang("SaleDepartment") === kh_saleout_spmt_date("Don_vi") and th_thang("Organization") === kh_saleout_spmt_date("Vung_CN") and th_thang("ProductGroupName") === kh_saleout_spmt_date("Name_SPMT") and th_thang("Team_Name") === kh_saleout_spmt_date("Ten_doi"), "full" ).select( coalesce(th_thang("Day"), kh_saleout_spmt_date("date")).as("Day"), coalesce(th_thang("SaleDepartment"), kh_saleout_spmt_date("Don_vi")).as("SaleDepartment"), coalesce(th_thang("Organization"), kh_saleout_spmt_date("Vung_CN")).as("Organization"), coalesce(th_thang("ProductGroupName"), kh_saleout_spmt_date("Name_SPMT")).as("ProductGroupName"), coalesce(th_thang("Team_Name"), kh_saleout_spmt_date("Ten_doi")).as("Team_Name"), th_thang("SaleOut_DT_SP_MT"), th_thang("SaleOut_SL_SP_MT"), kh_saleout_spmt_date("SaleOut_DT_SP_MT_KH"), kh_saleout_spmt_date("SaleOut_SL_SP_MT_KH") ) val res = thang .filter(col("Day").between("2024-01-01", "2024-12-31")) .select( lit("SaleOut").alias("Ten_chi_tieu"), lit(null).alias("Thoi_gian_ke_hoach"), col("SaleDepartment").alias("Don_vi"), col("Organization").alias("Vung_CN"), col("ProductGroupName").alias("Name_SPMT"), col("Team_Name").alias("Ten_doi"), lit(null).alias("So_luong"), lit(null).alias("Doanh_thu") ) val re = res.distinct() val reCast = re .withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType)) .withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType)) .withColumn("Don_vi", col("Don_vi").cast(StringType)) .withColumn("Vung_CN", col("Vung_CN").cast(StringType)) .withColumn("Name_SPMT", col("Name_SPMT").cast(StringType)) .withColumn("Ten_doi", col("Ten_doi").cast(StringType)) .withColumn("So_luong", col("So_luong").cast(IntegerType)) .withColumn("Doanh_thu", col("Doanh_thu").cast(DoubleType)) thang.write.format("avro").mode("overwrite").save(outputDir) } } /// code lay bieu mau saleout km package vn.com.viettel.code.dashboard_date import org.apache.spark.sql.functions._ import vn.com.viettel.AppConfig import vn.com.viettel.utils.SparkUtils.createBatchSession object -+saleout_sp_km { def main(args: Array[String]): Unit = { val spark = createBatchSession() val indirectsalesorder = spark.read.table("WHS.Order_IndirectSalesOrder") .withColumn("OrderDate",col("OrderDate") + expr("INTERVAL 7 HOURS")) val indirectsaleordercontent = spark.read.table("WHS.Order_IndirectSalesOrderContent") val store = spark.read.table("WHS.MasterData_DMSStore") val saleplan_product_class = spark.read.table("WHS.SalePlan_ProductClass") val saleplan_product_class_detail = spark.read.table("WHS.SalePlan_ProductClassDetail") val item = spark.read.table("WHS.MasterData_Item") val khsp_bh1_bh2 = spark.read.format("avro").load("/gold_zone/full_load/whs/saleplan/khsp_bh1_bh2") // val date = spark.read.format("avro").load(f"${AppConfig.rootPath}/date") val organization = spark.read.table("WHS.SaleChannel_BM02_Organization_RD") val mappingc2 =spark.read.format("csv").option("multiLine", "true").option("header", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/mapping_dms_c2td.csv") val df_appuser = spark.read.table("whs.masterdata_appuser") val df_to_doi_c2 = spark.read.format("csv").option("multiLine", "true").option("header", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/to_doi_saleout.csv") indirectsalesorder.createOrReplaceTempView("indirectsalesorder") store.createOrReplaceTempView("store") saleplan_product_class.createOrReplaceTempView("saleplan_product_class") saleplan_product_class_detail.createOrReplaceTempView("saleplan_product_class_detail") indirectsaleordercontent.createOrReplaceTempView("indirectsaleordercontent") item.createOrReplaceTempView("item") // date.createOrReplaceTempView("date") organization.createOrReplaceTempView("organization") df_to_doi_c2.createOrReplaceTempView("df_to_doi_c2") df_appuser.createOrReplaceTempView("df_appuser") val excel = spark.read.format("csv") .option("header", "true") .option("multiLine", "true") // .load("/raw_zone/excel_form/Order.csv") .load("/raw_zone/excel_form/upload/rangdong_store/c2_*.csv") .withColumn("Ngay_dat_hang", split(col("Ngày đặt hàng"), " ")(0)) .withColumn( "Ngay_dat_hang", when(col("Ngay_dat_hang").contains("/"), to_date(col("Ngay_dat_hang"), "M/d/y")) .when(col("Ngay_dat_hang").contains("-"), to_date(col("Ngay_dat_hang"), "yyyy-MM-dd"))) .withColumn("OrderDate", from_unixtime(unix_timestamp(col("Ngày đặt hàng"), "yyyy-MM-dd HH:mm:ss")).alias("OrderDate")) val don_hang = excel .join(mappingc2, mappingc2("User_Name") === excel("Username"), "left") .withColumn(("Id_excel"), substring(excel("Username"), -7, 7).cast("bigint")) .join(store, store("Id") === col("Id_excel"), "left") .join(saleplan_product_class, excel("Barcode") === saleplan_product_class("ItemCodeSAP"), "left") .filter(excel("OrderDate").between(store("StartDate") + expr("INTERVAL 7 HOURS"), store("EndDate") + expr("INTERVAL 7 HOURS"))) .filter(col("StoreTypeId").isin(17, 18, 23)) .withColumn("Organization", when(store("Code").startsWith("TV01"), lit("TV01")) .when(store("Code").startsWith("TV02"), lit("TV02")) .when(store("Code").startsWith("TV03"), lit("TV03")) .when(store("Code").startsWith("TN"), lit("TN")) .when(store("Code").startsWith("NT"), lit("NT")) .when(store("Code").startsWith("TG"), lit("TG")) .when(store("Code").startsWith("CT"), lit("CT")) .when(store("Code").startsWith("DN"), lit("DN")) .when(store("Code").startsWith("BH"), lit("BH")) .when(store("Code").startsWith("HCM"), lit("HCM"))) .withColumn("SaleDepartment", when(store("Code").startsWith("TV01") or store("Code").startsWith("TV02") or store("Code").startsWith("TV03"), lit("P_BH1")) .when(store("Code").startsWith("TN") or store("Code").startsWith("TG") or store("Code").startsWith("NT") or store("Code").startsWith("CT") or store("Code").startsWith("DN") or store("Code").startsWith("BH") or store("Code").startsWith("HCM"), lit("P_BH2"))) .select( to_date(excel("Ngay_dat_hang")).alias("Day"), (col("Số lượng") * col("Đơn Giá") - coalesce(col("Giảm giá"), lit("0"))).alias("Amount"), col("SaleDepartment"), col("Organization"), col("Số lượng").as("Quantity"), saleplan_product_class("SPKM_BH1"), saleplan_product_class("SPKM_BH2"), mappingc2("Doi").alias("Team_Name") ) val saleout_sp_km_temp = spark.sql( s""" SELECT date_format(OrderDate,"yyyy-MM-dd") Day, isoc.Amount, CASE WHEN iso.StoreCode LIKE 'TV01%' THEN 'P_BH1' WHEN iso.StoreCode LIKE 'TV02%' THEN 'P_BH1' WHEN iso.StoreCode LIKE 'TV03%' THEN 'P_BH1' WHEN iso.StoreCode LIKE 'TN%' THEN 'P_BH2' WHEN iso.StoreCode LIKE 'TG%' THEN 'P_BH2' WHEN iso.StoreCode LIKE 'NT%' THEN 'P_BH2' WHEN iso.StoreCode LIKE 'CT%' THEN 'P_BH2' WHEN iso.StoreCode LIKE 'DN%' THEN 'P_BH2' WHEN iso.StoreCode LIKE 'BH%' THEN 'P_BH2' WHEN iso.StoreCode LIKE 'HCM%' THEN 'P_BH2' END AS SaleDepartment, CASE when s.Code like 'TV01%' then "TV01" when s.Code like 'TV02%' then "TV02" when s.Code like 'TV03%' then "TV03" when s.Code like 'TN%' then "TN" when s.Code like 'TG%'then "TG" when s.Code like 'NT%'then "NT" when s.Code like 'CT%' then "CT" when s.Code like 'DN%' then "DN" when s.Code like 'BH%' then "BH" when s.Code like 'HCM%' then "HCM" END AS Organization, isoc.Quantity, spc.SPKM_BH1, spc.SPKM_BH2, dtdc2.Team_Name FROM indirectsaleordercontent AS isoc LEFT JOIN indirectsalesorder as iso on isoc.IndirectSalesOrderId = iso.ID LEFT JOIN store AS s ON s.Id = iso.buyerstoreID and iso.OrderDate BETWEEN s.StartDate and s.EndDate LEFT JOIN item as i on i.ID = isoc.ItemID LEFT JOIN saleplan_product_class as spc on i.Code = spc.ItemCodeSAP LEFT JOIN df_appuser as da on da.ID = iso.SaleEmployeeId LEFT JOIN df_to_doi_c2 as dtdc2 on dtdc2.Username = da.Username WHERE iso.generalindirectstateID = 3 and iso.BuyerStoreTypeID in (17,18,23) """.stripMargin).unionByName(don_hang) saleout_sp_km_temp.createOrReplaceTempView("saleout_sp_km_temp") val saleout_sp_km_month = spark.sql( s""" SELECT Day, SaleDepartment, Organization, SPKM_BH1,SPKM_BH2, sum(Amount) as SaleOut_DT_SP_KM, sum(Quantity) as SaleOut_SL_SP_KM, Team_Name FROM saleout_sp_km_temp group by Day, SPKM_BH1,SPKM_BH2, SaleDepartment , Organization, Team_Name """.stripMargin) val df_saleout_sp_km_kh = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/saleout_spkm.csv") .filter(col("Ten_chi_tieu") === "SaleOut" and col("Loai_KH") === "Thang") .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M")) .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q")) .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y")) .groupBy("Month", "Quarter", "Year", "Don_vi", "SPKM_BH1","SPKM_BH2", "Loai_KH","Vung_CN","Ten_doi") .agg(sum("Doanh_thu").as("Doanh_thu_KH"),sum("So_luong").as("So_luong_KH")) // Chia kế hoạch theo từng ngày val my_date = spark.read.format("avro").load("/raw_zone/full_load/date/") val date_of_month = spark.read.format("avro").load("/raw_zone/full_load/date/").groupBy("month", "quarter", "year").agg(count("*").alias("so_ngay")) val count_date_of_month = my_date.as("t1").join(date_of_month.as("t2"), my_date("month") === date_of_month("month") && my_date("year") === date_of_month("year")) .select(col("t1.date"), col("t1.month"), col("t1.quarter"), col("t1.year"), col("t2.so_ngay")) val df_saleout_sp_km_kh_date = df_saleout_sp_km_kh.join(count_date_of_month, df_saleout_sp_km_kh("Month") === count_date_of_month("Month") and df_saleout_sp_km_kh("Year") === count_date_of_month("Year"), "left") .withColumn("Doanh_thu_KH", col("Doanh_thu_KH") /col("so_ngay")) .withColumn("So_luong_KH", col("So_luong_KH") /col("so_ngay")) val saleout_sp_km = saleout_sp_km_month.join( df_saleout_sp_km_kh_date, coalesce(saleout_sp_km_month("Day"), lit("")) === coalesce(df_saleout_sp_km_kh_date("date"), lit("")) && coalesce(saleout_sp_km_month("SaleDepartment"), lit("")) === coalesce(df_saleout_sp_km_kh_date("Don_vi"), lit("")) && coalesce(saleout_sp_km_month("Organization"), lit("")) === coalesce(df_saleout_sp_km_kh_date("Vung_CN"), lit("")) && coalesce(saleout_sp_km_month("Team_Name"), lit("")) === coalesce(df_saleout_sp_km_kh_date("Ten_doi"), lit("")) && coalesce(saleout_sp_km_month("SPKM_BH1"), lit("")) === coalesce(df_saleout_sp_km_kh_date("SPKM_BH1"), lit("")) && coalesce(saleout_sp_km_month("SPKM_BH2"), lit("")) === coalesce(df_saleout_sp_km_kh_date("SPKM_BH2"), lit("")), "full" ) .select( coalesce(saleout_sp_km_month("Day"), df_saleout_sp_km_kh_date.col("date")).alias("Day"), coalesce(saleout_sp_km_month("SaleDepartment"), df_saleout_sp_km_kh_date.col("Don_vi")).alias("SaleDepartment"), coalesce(saleout_sp_km_month("Organization"), df_saleout_sp_km_kh_date.col("Vung_CN")).alias("Organization"), coalesce(saleout_sp_km_month("SPKM_BH1"), df_saleout_sp_km_kh_date.col("SPKM_BH1")).alias("SPKM_BH1"), coalesce(saleout_sp_km_month("SPKM_BH2"), df_saleout_sp_km_kh_date.col("SPKM_BH2")).alias("SPKM_BH2"), saleout_sp_km_month("SaleOut_DT_SP_KM"), saleout_sp_km_month("SaleOut_SL_SP_KM"), df_saleout_sp_km_kh_date("Doanh_thu_KH").alias("SaleOut_DT_SP_KM_KH"), df_saleout_sp_km_kh_date("So_luong_KH").alias("SaleOut_SL_SP_KM_KH"), coalesce(saleout_sp_km_month("Team_Name"), df_saleout_sp_km_kh_date.col("Ten_doi")).alias("Team_Name") ) val res = saleout_sp_km .filter(col("Day").between("2024-01-01", "2024-12-31")) .select( lit("SaleOut").alias("Ten_chi_tieu"), lit(null).alias("Thoi_gian_ke_hoach"), col("SaleDepartment").alias("Don_vi"), col("Organization").alias("Vung_CN"), col("SPKM_BH1").alias("SPKM_BH1"), col("SPKM_BH2").alias("SPKM_BH2"), col("Team_Name").alias("Ten_doi"), lit(null).alias("So_luong"), lit(null).alias("Doanh_thu") ) val re = res.distinct() val reCast = re .withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType)) .withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType)) .withColumn("Don_vi", col("Don_vi").cast(StringType)) .withColumn("Vung_CN", col("Vung_CN").cast(StringType)) .withColumn("SPKM_BH1", col("SPKM_BH1").cast(StringType)) .withColumn("SPKM_BH2", col("SPKM_BH2").cast(StringType)) .withColumn("Ten_doi", col("Ten_doi").cast(StringType)) .withColumn("So_luong", col("So_luong").cast(IntegerType)) .withColumn("Doanh_thu", col("Doanh_thu").cast(DoubleType)) saleout_sp_km.write.format("avro").mode("overwrite").save(AppConfig.outputDir) } } //code lay bieu mau saleout nsp package vn.com.viettel.code.dashboard_date import org.apache.spark.sql.functions._ import vn.com.viettel.AppConfig import vn.com.viettel.utils.SparkUtils.createBatchSession object saleout_dt_sp_led_phich { def main(args: Array[String]): Unit = { val spark = createBatchSession() val doi = spark.read.format("csv").option("multiLine", "true").option("header", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/to_doi_saleout.csv") val appuser = spark.read.table("whs.masterdata_appuser") val mappingc2 =spark.read.format("csv").option("multiLine", "true").option("header", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/mapping_dms_c2td.csv") //create temp_product val product_class = spark.read.table("WHS.SalePlan_ProductClass") val product_class_detail = spark.read.table("WHS.SalePlan_ProductClassDetail") val product_class_detail1 = spark.read.table("WHS.SalePlan_ProductClassDetail") val product_class_detail2 = spark.read.table("WHS.SalePlan_ProductClassDetail") val indirectsalesorder = spark.read.table("WHS.Order_IndirectSalesOrder") .withColumn("OrderDate", col("OrderDate") + expr("INTERVAL 7 HOURS")) //.filter(col("GeneralIndirectStateId").isin(3) && col("BuyerStoreTypeId").isin(17,18,23)) val store = spark.read.table("WHS.MasterData_DMSStore") val storetype = spark.read.table("WHS.MasterData_StoreType") val province = spark.read.table("WHS.MasterData_Province") val indirectsalesordercontent = spark.read.table("WHS.Order_IndirectSalesOrderContent") val item = spark.read.table("WHS.MasterData_Item") val product = spark.read.table("WHS.Product_Product") val temp_product = product_class .join(product_class_detail, product_class("N_SP1") === product_class_detail("ID"), "left") .join(product_class_detail1.as("product_class_detail1"), product_class("N_SP2") === product_class_detail1("ID"), "left") .join(product_class_detail2.as("product_class_detail2"), product_class("N_SP3") === product_class_detail2("ID"), "left") .select(product_class("ItemName"), product_class("ItemCodeSAP"), product_class("N_SP1"), product_class("N_SP2"), product_class("N_SP3"), product_class("SPKM_BH1"), product_class("SPKM_BH2"), product_class_detail("Class").alias("Name_NSP1"), col("product_class_detail1.Class").alias("Name_NSP2"), col("product_class_detail2.Class").alias("Name_NSP3") ) val excel = spark.read.format("csv") .option("header", "true") .option("multiLine", "true") // .load("/raw_zone/excel_form/Order.csv") .load("/raw_zone/excel_form/upload/rangdong_store/c2_*.csv") .withColumn("Ngay_dat_hang", split(col("Ngày đặt hàng"), " ")(0)) .withColumn( "Ngay_dat_hang", when(col("Ngay_dat_hang").contains("/"), to_date(col("Ngay_dat_hang"), "M/d/y")) .when(col("Ngay_dat_hang").contains("-"), to_date(col("Ngay_dat_hang"), "yyyy-MM-dd"))) .withColumn("OrderDate", from_unixtime(unix_timestamp(col("Ngày đặt hàng"), "yyyy-MM-dd HH:mm:ss")).alias("OrderDate")) val don_hang = excel .join(mappingc2, mappingc2("User_Name") === excel("Username"), "left") .withColumn(("Id_excel"), substring(excel("Username"), -7, 7).cast("bigint")) .join(store, store("Id") === col("Id_excel"), "left") .join(temp_product, excel("Barcode") === temp_product("ItemCodeSAP"), "left") .join(province, store("ProvinceId") === province("Id"), "left") .filter(excel("OrderDate").between(store("StartDate") + expr("INTERVAL 7 HOURS"), store("EndDate") + expr("INTERVAL 7 HOURS"))) .filter(col("StoreTypeId").isin(17, 18, 23)) .withColumn("Organization", when(store("Code").startsWith("TV01"), lit("TV01")) .when(store("Code").startsWith("TV02"), lit("TV02")) .when(store("Code").startsWith("TV03"), lit("TV03")) .when(store("Code").startsWith("TN"), lit("TN")) .when(store("Code").startsWith("NT"), lit("NT")) .when(store("Code").startsWith("TG"), lit("TG")) .when(store("Code").startsWith("CT"), lit("CT")) .when(store("Code").startsWith("DN"), lit("DN")) .when(store("Code").startsWith("BH"), lit("BH")) .when(store("Code").startsWith("HCM"), lit("HCM"))) .withColumn("SaleDepartment", when(store("Code").startsWith("TV01") or store("Code").startsWith("TV02") or store("Code").startsWith("TV03"), lit("P_BH1")) .when(store("Code").startsWith("TN") or store("Code").startsWith("TG") or store("Code").startsWith("NT") or store("Code").startsWith("CT") or store("Code").startsWith("DN") or store("Code").startsWith("BH") or store("Code").startsWith("HCM"), lit("P_BH2"))) .select( to_date(excel("Ngay_dat_hang")).alias("Day"), (col("Số lượng") * col("Đơn Giá") - coalesce(col("Giảm giá"), lit("0"))).alias("Amount"), col("SaleDepartment"), col("Organization"), when(store("StoreTypeId") === 17, lit("Cấp 2")) .when(store("StoreTypeId") === 18, lit("Cấp 2 Trọng điểm")) .when(store("StoreTypeId") === 23, lit("Cấp C2 siêu lớn")) .alias("CustomerType"), temp_product("N_SP1"), temp_product("N_SP2"), temp_product("N_SP3"), temp_product("Name_NSP1"), temp_product("Name_NSP2"), temp_product("Name_NSP3"), mappingc2("Doi").alias("Team_Name") ) val Indirectsalesordercontent = indirectsalesordercontent .join(indirectsalesorder, indirectsalesordercontent("IndirectSalesOrderId") ===indirectsalesorder("Id") , "right") .join(store, indirectsalesorder("BuyerStoreId") === store("Id") && indirectsalesorder("OrderDate").between( store("StartDate"), store("EndDate")), "left") .join(province, store("ProvinceId") === province("Id"), "left") .join(item, indirectsalesordercontent("ItemId") === item("Id"), "left") .join(product, item("ProductId") === product("Id"), "left") .join(temp_product, product("Code") === temp_product("ItemCodeSAP"), "left") .filter(indirectsalesorder("BuyerStoreTypeID").isin(17,18,23) && indirectsalesorder("GeneralIndirectStateId")=== 3) .join(appuser, indirectsalesorder("SaleEmployeeId") === appuser("Id"), "left") .join(doi, doi("Username") === appuser("Username"), "left") .select( date_format(indirectsalesorder("OrderDate"),"yyyy-MM-dd").alias("Day"), when(indirectsalesorder("StoreCode").startsWith("TV01") or indirectsalesorder("StoreCode").startsWith("TV02") or indirectsalesorder("StoreCode").startsWith("TV03"), lit("P_BH1")).when( indirectsalesorder("StoreCode").startsWith("TN") || indirectsalesorder("StoreCode").startsWith("TG") || indirectsalesorder("StoreCode").startsWith("NT") || indirectsalesorder("StoreCode").startsWith("CT") || indirectsalesorder("StoreCode").startsWith("DN") || indirectsalesorder("StoreCode").startsWith("BH") || indirectsalesorder("StoreCode").startsWith("HCM"), lit("P_BH2") ).alias("SaleDepartment"), when(store("Code").startsWith("TV01"), lit("TV01")) .when(store("Code").startsWith("TV02"), lit("TV02")) .when(store("Code").startsWith("TV03"), lit("TV03")) .when(store("Code").startsWith("TN"), lit("TN")) .when(store("Code").startsWith("NT"), lit("NT")) .when(store("Code").startsWith("TG"), lit("TG")) .when(store("Code").startsWith("CT"), lit("CT")) .when(store("Code").startsWith("DN"), lit("DN")) .when(store("Code").startsWith("BH"), lit("BH")) .when(store("Code").startsWith("HCM"), lit("HCM")) .alias("Organization"), when(indirectsalesorder("BuyerStoreTypeID") === 17, lit("Cấp 2")) .when(indirectsalesorder("BuyerStoreTypeID") === 18, lit("Cấp 2 Trọng điểm")) .when(indirectsalesorder("BuyerStoreTypeID") === 23, lit("Cấp C2 siêu lớn")) .alias("CustomerType"), temp_product("N_SP1"), temp_product("N_SP2"), temp_product("N_SP3"), temp_product("Name_NSP1"), temp_product("Name_NSP2"), temp_product("Name_NSP3"), doi("Team_Name"), indirectsalesordercontent("Amount") ).unionByName(don_hang) val Loai_thang = Indirectsalesordercontent.groupBy("Day", "SaleDepartment", "Organization", "CustomerType","N_SP1","N_SP2","N_SP3", "Name_NSP1", "Name_NSP2", "Name_NSP3", "Team_Name").agg(sum("Amount").alias("SaleOut_LP_TH")) val plan = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/saleout_nsp.csv") .filter(col("Ten_chi_tieu") === "saleout_nsp" and col("Loai_KH") === "Thang") .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M")) .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q")) .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y")) .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q")) val planWithProduct = plan.join(temp_product, plan("Name_NSP1") === temp_product("Name_NSP1") && plan("Name_NSP2") === temp_product("Name_NSP2") && plan("Name_NSP3") === temp_product("Name_NSP3"), "left") .select( plan("Month"), plan("Quarter"), plan("Year"), plan("Don_vi"), plan("Loai_KH"), plan("Cap_dai_ly"), plan("Vung_CN"), plan("Ten_doi"), plan("Doanh_thu"), plan("Tinh_thanh_pho"), temp_product("N_SP1").as("N_SP1"), temp_product("N_SP2").as("N_SP2"), temp_product("N_SP3").as("N_SP3"), plan("Name_NSP1"), plan("Name_NSP2"), plan("Name_NSP3") ) val df_saleout_dt_kh = planWithProduct.filter(col("Loai_KH") === "Thang") .groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN", "Tinh_thanh_pho", "Cap_dai_ly","N_SP1","N_SP2","N_SP3","Name_NSP1", "Name_NSP2","Name_NSP3" ,"Ten_doi") .agg(sum("Doanh_thu").as("Doanh_thu_KH")) // Chia kế hoạch theo từng ngày val my_date = spark.read.format("avro").load("/raw_zone/full_load/date/") val date_of_month = spark.read.format("avro").load("/raw_zone/full_load/date/").groupBy("month", "quarter", "year").agg(count("*").alias("so_ngay")) val count_date_of_month = my_date.as("t1").join(date_of_month.as("t2"), my_date("month") === date_of_month("month") && my_date("year") === date_of_month("year")) .select(col("t1.date"), col("t1.month"), col("t1.quarter"), col("t1.year"), col("t2.so_ngay")) val df_saleout_dt_kh_date = df_saleout_dt_kh.join(count_date_of_month, df_saleout_dt_kh("Month") === count_date_of_month("Month") and df_saleout_dt_kh("Year") === count_date_of_month("Year"), "left") .withColumn("Doanh_thu_KH", col("Doanh_thu_KH") /col("so_ngay")) val result = Loai_thang.join( df_saleout_dt_kh_date, coalesce(Loai_thang("Day"), lit("")) === coalesce(df_saleout_dt_kh_date("date"), lit("")) && coalesce(Loai_thang("SaleDepartment"), lit("")) === coalesce(df_saleout_dt_kh_date("Don_vi"), lit("")) && coalesce(Loai_thang("Organization"), lit("")) === coalesce(df_saleout_dt_kh_date("Vung_CN"), lit("")) && coalesce(Loai_thang("Team_Name"), lit("")) === coalesce(df_saleout_dt_kh_date("Ten_doi"), lit("")) && coalesce(Loai_thang("CustomerType"), lit("")) === coalesce(df_saleout_dt_kh_date("Cap_dai_ly"), lit("")) && coalesce(Loai_thang("N_SP1"), lit("")) === coalesce(df_saleout_dt_kh_date("N_SP1"), lit("")) && coalesce(Loai_thang("N_SP2"), lit("")) === coalesce(df_saleout_dt_kh_date("N_SP2"), lit("")) && coalesce(Loai_thang("N_SP3"), lit("")) === coalesce(df_saleout_dt_kh_date("N_SP3"), lit("")) && coalesce(Loai_thang("Name_NSP1"), lit("")) === coalesce(df_saleout_dt_kh_date("Name_NSP1"), lit("")) && coalesce(Loai_thang("Name_NSP2"), lit("")) === coalesce(df_saleout_dt_kh_date("Name_NSP2"), lit("")) && coalesce(Loai_thang("Name_NSP3"), lit("")) === coalesce(df_saleout_dt_kh_date("Name_NSP3"), lit("")), "full" ).select( coalesce(Loai_thang("Day"), df_saleout_dt_kh_date.col("date")).alias("Day"), coalesce(Loai_thang("SaleDepartment"), df_saleout_dt_kh_date.col("Don_vi")).alias("SaleDepartment"), coalesce(Loai_thang("Organization"), df_saleout_dt_kh_date.col("Vung_CN")).alias("Organization"), coalesce(Loai_thang("CustomerType"), df_saleout_dt_kh_date.col("Cap_dai_ly")).alias("CustomerType"), coalesce(Loai_thang("N_SP1"), df_saleout_dt_kh_date.col("N_SP1")).alias("ID_NSP1"), coalesce(Loai_thang("N_SP2"), df_saleout_dt_kh_date.col("N_SP2")).alias("ID_NSP2"), coalesce(Loai_thang("N_SP3"), df_saleout_dt_kh_date.col("N_SP3")).alias("ID_NSP3"), coalesce(Loai_thang("Name_NSP1"), df_saleout_dt_kh_date.col("Name_NSP1")).alias("Name_NSP1"), coalesce(Loai_thang("Name_NSP2"), df_saleout_dt_kh_date.col("Name_NSP2")).alias("Name_NSP2"), coalesce(Loai_thang("Name_NSP3"), df_saleout_dt_kh_date.col("Name_NSP3")).alias("Name_NSP3"), coalesce(Loai_thang("Team_Name"), df_saleout_dt_kh_date.col("Ten_doi")).alias("Team_Name"), Loai_thang("SaleOut_LP_TH").alias("SaleOut_LP_TH"), df_saleout_dt_kh_date("Doanh_thu_KH").alias("SaleOut_LP_KH"), ).withColumn("Ma_Nhom", concat(lit("L"), substring(col("ID_NSP1"), -1, 1))) val filteredResult = result .filter(col("Day").between("2024-01-01", "2024-12-31")) .select( lit("saleout_nsp").alias("Ten_chi_tieu"), lit(null).alias("Thoi_gian_ke_hoach"), col("SaleDepartment").alias("Don_vi"), col("Organization").alias("Vung_CN"), col("CustomerType").alias("Cap_dai_ly"), col("Team_Name").alias("Ten_doi"), col("Name_NSP1").alias("Name_NSP1"), col("Name_NSP2").alias("Name_NSP2"), col("Name_NSP3").alias("Name_NSP3"), lit(null).alias("So_luong"), lit(null).alias("Doanh_thu") ) // Apply distinct to ensure unique rows val re = filteredResult.distinct() val reCast = re .withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType)) .withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType)) .withColumn("Don_vi", col("Don_vi").cast(StringType)) .withColumn("Vung_CN", col("Vung_CN").cast(StringType)) .withColumn("Cap_dai_ly", col("Cap_dai_ly").cast(StringType)) .withColumn("Ten_doi", col("Ten_doi").cast(StringType)) .withColumn("Name_NSP1", col("Name_NSP1").cast(StringType)) .withColumn("Name_NSP2", col("Name_NSP2").cast(StringType)) .withColumn("Name_NSP3", col("Name_NSP3").cast(StringType)) .withColumn("So_luong", col("So_luong").cast(IntegerType)) .withColumn("Doanh_thu", col("Doanh_thu").cast(DoubleType)) result.write.format("avro").mode("overwrite").save(AppConfig.outputDir) } }
Editor is loading...
Leave a Comment