Untitled
unknown
plain_text
a year ago
7.7 kB
7
Indexable
// code bieu mau ke hoach doanh thu salein nsp
package vn.com.viettel.code.dashboard_date
import vn.com.viettel.AppConfig
import vn.com.viettel.utils.SparkUtils.{DataFrameOps, createBatchSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
object salein_class {
def main(args: Array[String]): Unit = {
val spark = createBatchSession()
// salein class date
val df_product_class_detail = spark.read.table("WHS.SalePlan_ProductClassDetail")
val product_class = spark.read.table("WHS.SalePlan_ProductClass")
val product_class_detail = spark.read.table("WHS.SalePlan_ProductClassDetail")
val product_class_detail1 = spark.read.table("WHS.SalePlan_ProductClassDetail")
val product_class_detail2 = spark.read.table("WHS.SalePlan_ProductClassDetail")
// read data from gold_zone
val temp_product = product_class
.join(product_class_detail, product_class("N_SP1") === product_class_detail("ID"), "left")
.join(product_class_detail1.as("product_class_detail1"), product_class("N_SP2") === product_class_detail1("ID"), "left")
.join(product_class_detail2.as("product_class_detail2"), product_class("N_SP3") === product_class_detail2("ID"), "left")
.select(product_class("ItemName"),
product_class("ItemCodeSAP"),
product_class("N_SP1"),
product_class("N_SP2"),
product_class("N_SP3"),
product_class("SPKM_BH1"),
product_class("SPKM_BH2"),
product_class_detail("Class").alias("Name_NSP1"),
col("product_class_detail1.Class").alias("Name_NSP2"),
col("product_class_detail2.Class").alias("Name_NSP3")
)
val kh = spark.read.table("excel_form.test_KH_BH1_KH")
val kh_salein_class_product_class = spark.read.format("csv").option("header", true).load("/raw_zone/excel_form/upload/ke_hoach/salein_tx_thutien_nsp.csv").filter(col("Ten_chi_tieu") === "Doanh thu (SaleIn)" and col("Loai_KH") === "Thang")
.withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M"))
.withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q"))
.withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y"))
.withColumnRenamed("Don_vi", "SalesDepartment")
.withColumnRenamed("Vung_CN", "OrganizationName")
.withColumnRenamed("Kenh_ban", "SalesChannelName")
.withColumnRenamed("Name_NSP1", "NSP1")
.withColumnRenamed("Name_NSP2", "NSP2")
.withColumnRenamed("Name_NSP3", "NSP3")
.withColumnRenamed("Ten_doi", "Team_Name")
.withColumn("Gia_tri_ke_hoach", col("Doanh_thu").cast("double"))
.withColumn("So_luong", col("So_luong").cast("int"))
.join(temp_product, col("NSP1") === temp_product("Name_NSP1")
&& col("NSP2") === temp_product("Name_NSP2")
&& col("NSP3") === temp_product("Name_NSP3"), "left")
.select(
col("*"),
temp_product("N_SP1").alias("ID_NSP1"),
temp_product("N_SP2").alias("ID_NSP2"),
temp_product("N_SP3").alias("ID_NSP3")
)
.withColumn("Ma_Nhom", concat(lit("L"), substring(col("ID_NSP1"), -1, 1)))
.withColumnRenamed("Doanh_thu","Revenue_Salein_Plan")
.withColumn("ItemCode",lit(null))
.withColumn("ItemName", lit(null))
.withColumn("CustomerName", lit(null))
.withColumn("CustomerCode", lit(null))
.withColumnRenamed("Thoi_gian_ke_hoach", "Day")
.withColumn("Revenue_Salein_Perform", lit(0))
// generate data from 2010 and 2050
val plan = kh_salein_class_product_class
.withColumnRenamed("Name_NSP1", "NSP1")
.withColumnRenamed("Name_NSP2", "NSP2")
.withColumnRenamed("Name_NSP3", "NSP3")
.select(
col("Day"),
col("SaleDepartment"),
col("OrganizationName"),
col("SalesChannelName"),
col("NSP1"),
col("NSP2"),
col("NSP3"),
col("Team_Name"),
col("Revenue_Salein_Plan"),
col("Revenue_Salein_Perform"),
col("Type")
)
.alias("plan")
.join(temp_product.alias("tp"),
col("plan.NSP1") === col("tp.Name_NSP1") &&
col("plan.NSP2") === col("tp.Name_NSP2") &&
col("plan.NSP3") === col("tp.Name_NSP3"), "left")
.select(
col("plan.*"),
col("tp.N_SP1").alias("N_SP1"),
col("tp.N_SP2").alias("N_SP2"),
col("tp.N_SP3").alias("N_SP3")
)
.withColumn("Ma_Nhom", concat(lit("L"), substring(col("N_SP1"), -1, 1)))
val df_salein_class = spark.read.format("avro").load("/gold_zone/full_load/kpi_date/salein_dt")
val df_output = df_salein_class
.withColumnRenamed("Organization", "OrganizationName")
.withColumnRenamed("Kenh", "SalesChannelName")
.withColumnRenamed("Name_NSP1", "NSP1")
.withColumnRenamed("Name_NSP2", "NSP2")
.withColumnRenamed("Name_NSP3", "NSP3")
.withColumnRenamed("Doanh_thu", "Revenue_Salein_Perform")
.groupBy(
col("Ngay_lapHD"),
col("SaleDepartment"),
col("OrganizationName"),
col("SalesChannelName"),
col("Ma_Nhom"),
col("N_SP1"),
col("N_SP2"),
col("N_SP3"),
col("NSP1"),
col("NSP2"),
col("NSP3"),
col("Team_Name")
)
.agg(
sum("Revenue_Salein_Perform").alias("Revenue_Salein_Perform")
)
.select(
col("Ngay_lapHD").alias("Day"),
col("SaleDepartment"),
col("OrganizationName"),
col("SalesChannelName"),
col("Ma_Nhom"),
col("N_SP1"),
col("N_SP2"),
col("N_SP3"),
col("NSP1"),
col("NSP2"),
col("NSP3"),
col("Team_Name"),
col("Revenue_Salein_Perform")
)
.withColumn("Type", lit(null))
.withColumn("Revenue_Salein_Plan", lit(0))
.alias("df_output")
val result = df_output.unionByName(plan)
val filteredResult = result
.filter(col("Day").between("2024-01-01", "2024-12-31"))
.select(
lit("Doanh thu (SaleIn)").alias("Ten_chi_tieu"),
lit(null).alias("Thoi_gian_ke_hoach"),
col("SaleDepartment").alias("Don_vi"),
col("OrganizationName").alias("Vung_CN"),
col("SalesChannelName").alias("Kenh_ban"),
col("Team_Name").alias("Ten_doi"),
col("NSP1").alias("Name_NSP1"),
col("NSP2").alias("Name_NSP2"),
col("NSP3").alias("Name_NSP3"),
lit(null).alias("So_luong"),
lit(null).alias("Doanh_thu"),
lit(null).alias("Loai_KH")
)
// Apply distinct to ensure unique rows
val re = filteredResult.distinct()
val reCast = re
.withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType))
.withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType))
.withColumn("Don_vi", col("Don_vi").cast(StringType))
.withColumn("Vung_CN", col("Vung_CN").cast(StringType))
.withColumn("Kenh_ban", col("Kenh_ban").cast(StringType))
.withColumn("Ten_doi", col("Ten_doi").cast(StringType))
.withColumn("Name_NSP1", col("Name_NSP1").cast(StringType))
.withColumn("Name_NSP2", col("Name_NSP2").cast(StringType))
.withColumn("Name_NSP3", col("Name_NSP3").cast(StringType))
.withColumn("So_luong", col("So_luong").cast(IntegerType))
.withColumn("Doanh_thu", col("Doanh_thu").cast(DoubleType))
.withColumn("Loai_KH", col("Loai_KH").cast(StringType))
result.write.format("avro").mode("overwrite").save(AppConfig.outputDir)
}
}
Editor is loading...
Leave a Comment