Untitled
unknown
plain_text
a year ago
10 kB
7
Indexable
package vn.com.viettel.code.dashboard_date import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{FloatType, TimestampType} import vn.com.viettel.AppConfig.{date, outputDir, rootPath} import vn.com.viettel.utils.SparkUtils.createBatchSession import org.apache.spark.sql.expressions._ object dt_tmdt_mr { def main(argv: Array[String]): Unit = { val spark: SparkSession = createBatchSession() // Tổng hợp thực hiện theo ngày val df_to_doi_c3 = spark.read.format("csv") .option("header", "true") .option("multiLine", "true").load("/raw_zone/excel_form/upload/mapping_to_doi/Mapping_C3_to_doi.csv") .withColumnRenamed("Đội xung kích", "Team_Name") .withColumnRenamed("Mã KH", "Ma_KH") .withColumn("Chi_nhanh", col("Vùng")) val donhang = spark.read.format("csv") .option("header", "true") .option("multiLine", "true").load("/raw_zone/excel_form/upload/rangdong_store/c3_*.csv") .filter(col("Username").rlike("^(RDMBC3|RDMNC3)") and col("Thành tiền") =!= "nan") .withColumn("Đơn Giá", col("Đơn Giá").cast("int")) .withColumn("Số lượng", col("Số lượng").cast("int")) .withColumn("Giảm giá", col("Giảm giá").cast("int")) .withColumn("Ngày đặt hàng", split(col("Ngày đặt hàng"), " ")(0)) .withColumn( "Ngày đặt hàng", when(col("Ngày đặt hàng").contains("/"), to_date(col("Ngày đặt hàng"), "M/d/y")) .when(col("Ngày đặt hàng").contains("-"), to_date(col("Ngày đặt hàng"), "yyyy-MM-dd")) ) .select( col("Username"), date_format(col("Ngày đặt hàng"), "yyyy-MM-dd").as("Day"), when(col("Username").startsWith("RDMBC3"), "TMĐT mở rộng BH1") .when(col("Username").startsWith("RDMNC3"), "TMĐT mở rộng BH2") .as("EcommerceOrderType"), // (coalesce(col("Đơn Giá") * col("Số lượng"), lit(0)) - coalesce(col("Giảm giá"), lit(0))).as("Doanh_thu"), col("Thành tiền").as("Doanh_thu"), col("Barcode") ) val df_product_class = spark.read.table("WHS.SalePlan_ProductClass") val df_product_class_detail = spark.read.table("WHS.SalePlan_ProductClassDetail").withColumnRenamed("Class", "Name_NSP1") val df_product_class_detail_clone_1 = df_product_class_detail.as("df_product_class_detail_clone_1").withColumnRenamed("Name_NSP1", "Name_NSP2") val df_product_class_detail_clone_2 = df_product_class_detail.as("df_product_class_detail_clone_2").withColumnRenamed("Name_NSP1", "Name_NSP3") val product_class = df_product_class.join(df_product_class_detail, df_product_class("N_SP1") === df_product_class_detail("ID"), "left") .join(df_product_class_detail_clone_1, df_product_class("N_SP2") === df_product_class_detail_clone_1("ID")) .join(df_product_class_detail_clone_2, df_product_class("N_SP3") === df_product_class_detail_clone_2("ID")) .select( df_product_class("ItemName"), df_product_class("ItemCodeSAP"), df_product_class("N_SP1"), df_product_class("N_SP2"), df_product_class("N_SP3"), df_product_class("SPKM_BH1"), df_product_class("SPKM_BH2"), df_product_class_detail("Name_NSP1"), df_product_class_detail_clone_1("Name_NSP2"), df_product_class_detail_clone_2("Name_NSP3") ) val plan = spark.read.format("csv") .option("header", "true") .option("multiLine", "true").load("/raw_zone/excel_form/upload/ke_hoach/tmdt_mr.csv") .filter(col("Ten_chi_tieu") === "tmdt_mr") .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M")) .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q")) .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y")) // Tách kế hoạch tháng, quý, năm val plan_res = plan.join(product_class,plan("Name_NSP1") === product_class("Name_NSP1") && plan("Name_NSP2") === product_class("Name_NSP2") && plan("Name_NSP3") === product_class("Name_NSP3"), "left") .select( plan("Month"), plan("Quarter"), plan("Year"), plan("Loai_KH"), plan("Doanh_thu"), plan("vung_cn"), plan("Ten_doi"), plan("EcommerceOrderType"), product_class("N_SP1").as("N_SP1"), product_class("N_SP2").as("N_SP2"), product_class("N_SP3").as("N_SP3"), plan("Name_NSP1"), plan("Name_NSP2"), plan("Name_NSP3") ) val df_tmdt_mr = plan_res.filter(col("Loai_KH") === "Thang") .groupBy("Month", "Quarter", "Year", "EcommerceOrderType","N_SP1","N_SP2","N_SP3", "Name_NSP1", "Name_NSP2", "Name_NSP3", "vung_cn", "Ten_doi").agg(sum("Doanh_thu").as("Doanh_thu_KH")) // Chia kế hoạch theo từng ngày val my_date = spark.read.format("avro").load("/raw_zone/full_load/date/") val date_of_month = spark.read.format("avro").load("/raw_zone/full_load/date/").groupBy("month", "quarter", "year").agg(count("*").alias("so_ngay")) val count_date_of_month = my_date.as("t1").join(date_of_month.as("t2"), my_date("month") === date_of_month("month") && my_date("year") === date_of_month("year")) .select(col("t1.date"), col("t1.month"), col("t1.quarter"), col("t1.year"), col("t2.so_ngay")) val df_tmdt_kh_date = df_tmdt_mr.join(count_date_of_month, df_tmdt_mr("Month") === count_date_of_month("Month") and df_tmdt_mr("Year") === count_date_of_month("Year"), "left") .withColumn("Doanh_thu_KH", col("Doanh_thu_KH") / col("so_ngay")) val th = donhang.join(product_class, donhang("Barcode") === product_class("ItemCodeSAP"), "left") // .join(dskhc3,donhang("Username")===dskhc3("Ma_KH"),"left") .join(df_to_doi_c3, df_to_doi_c3("Ma_KH") === donhang("Username"), "left") .groupBy("Day", "EcommerceOrderType", "N_SP1","N_SP2","N_SP3","Name_NSP1", "Name_NSP2", "Name_NSP3", "Chi_nhanh", "Team_Name") .agg(sum("Doanh_thu").as("Doanh_thu")) // Date val df_tmdt_tt_date = th.select( col("Day").as("Date"), col("EcommerceOrderType"), col("N_SP1"), col("N_SP2"), col("N_SP3"), col("Name_NSP1"), col("Name_NSP2"), col("Name_NSP3"), col("Doanh_thu"), col("Chi_nhanh"), col("Team_Name"), (col("Doanh_thu") * 1.3).cast(FloatType).as("Doanh_thu_KH") ) // Join thực hiện ngày với kế hoạch ngày val thang = df_tmdt_tt_date.join(df_tmdt_kh_date, df_tmdt_tt_date("Date") === df_tmdt_kh_date("Date") and df_tmdt_tt_date("EcommerceOrderType") === df_tmdt_kh_date("EcommerceOrderType") and df_tmdt_tt_date("N_SP1") === df_tmdt_kh_date("N_SP1") and df_tmdt_tt_date("N_SP2") === df_tmdt_kh_date("N_SP2") and df_tmdt_tt_date("N_SP3") === df_tmdt_kh_date("N_SP3") and df_tmdt_tt_date("Name_NSP1") === df_tmdt_kh_date("Name_NSP1") and df_tmdt_tt_date("Name_NSP2") === df_tmdt_kh_date("Name_NSP2") and df_tmdt_tt_date("Name_NSP3") === df_tmdt_kh_date("Name_NSP3") and df_tmdt_tt_date("Chi_nhanh") === df_tmdt_kh_date("Vung_CN") and df_tmdt_tt_date("Team_Name") === df_tmdt_kh_date("Ten_doi"), "full" ).select( coalesce(df_tmdt_tt_date("Date"), df_tmdt_kh_date("Date")).as("Date"), coalesce(df_tmdt_tt_date("EcommerceOrderType"), df_tmdt_kh_date("EcommerceOrderType")).as("EcommerceOrderType"), coalesce(df_tmdt_tt_date("N_SP1"), df_tmdt_kh_date("N_SP1")).as("ID_NSP1"), coalesce(df_tmdt_tt_date("N_SP2"), df_tmdt_kh_date("N_SP2")).as("ID_NSP2"), coalesce(df_tmdt_tt_date("N_SP3"), df_tmdt_kh_date("N_SP3")).as("ID_NSP3"), coalesce(df_tmdt_tt_date("Name_NSP1"), df_tmdt_kh_date("Name_NSP1")).as("Name_NSP1"), coalesce(df_tmdt_tt_date("Name_NSP2"), df_tmdt_kh_date("Name_NSP1")).as("Name_NSP2"), coalesce(df_tmdt_tt_date("Name_NSP3"), df_tmdt_kh_date("Name_NSP1")).as("Name_NSP3"), coalesce(df_tmdt_tt_date("Chi_nhanh"), df_tmdt_kh_date("Vung_CN")).as("Chi_nhanh"), coalesce(df_tmdt_tt_date("Team_Name"), df_tmdt_kh_date("Ten_doi")).as("Team_Name"), df_tmdt_tt_date("Doanh_thu").as("Doanh_thu"), df_tmdt_kh_date("Doanh_thu_KH").as("Doanh_thu_KH") ) .withColumn("Ma_Nhom", concat(lit("L"), substring(col("ID_NSP1"), -1, 1))) val filteredResult = thang .filter(col("Date").between("2024-01-01", "2024-12-31")) .select( lit("tmdt_mr").alias("Ten_chi_tieu"), lit(null).alias("Thoi_gian_ke_hoach"), col("EcommerceOrderType").alias("EcommerceOrderType"), col("Chi_nhanh").alias("Vung_CN"), col("Team_Name").alias("Ten_doi"), col("Name_NSP1").alias("Name_NSP1"), col("Name_NSP2").alias("Name_NSP2"), col("Name_NSP3").alias("Name_NSP3"), lit(null).alias("So_luong"), lit(null).alias("Doanh_thu"), lit(null).alias("Loai_KH") ) // Apply distinct to ensure unique rows val re = filteredResult.distinct() val reCast = re .withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType)) .withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType)) .withColumn("EcommerceOrderType", col("EcommerceOrderType").cast(StringType)) .withColumn("Vung_CN", col("Vung_CN").cast(StringType)) .withColumn("Ten_doi", col("Ten_doi").cast(StringType)) .withColumn("Name_NSP1", col("Name_NSP1").cast(StringType)) .withColumn("Name_NSP2", col("Name_NSP2").cast(StringType)) .withColumn("Name_NSP3", col("Name_NSP3").cast(StringType)) .withColumn("So_luong", col("So_luong").cast(IntegerType)) .withColumn("Doanh_thu", col("Doanh_thu").cast(DoubleType)) .withColumn("Loai_KH", col("Loai_KH").cast(StringType)) thang.write.format("avro").mode("overwrite").save(outputDir) } }
Editor is loading...
Leave a Comment