Untitled
unknown
plain_text
a year ago
7.4 kB
5
Indexable
package vn.com.viettel.code.dashboard_date import org.apache.spark.sql.SparkSession import org.apache.spark.sql.expressions._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.IntegerType import vn.com.viettel.AppConfig.outputDir import vn.com.viettel.utils.SparkUtils.createBatchSession object kpicustomer_open_customer { def main(argv: Array[String]): Unit = { // Create Spark Session val spark: SparkSession = createBatchSession() // Load chi tieu c2_mm val c2_mm = spark.read.format("avro").load("/gold_zone/full_load/kpi_date/c2_mm") .withColumn( "Category", when(col("StoreTypeId") === 17, "C2 mở mới") .when(col("StoreTypeId") === 18, "C2 Trọng điểm mở mới") .when(col("StoreTypeId") === 23, "C2 Siêu lớn mở mới") ).groupBy( "OpenDate", "SaleDepartment", "OrganizationName", "Category", "Team_Name" ).agg( sum(col("So_luong_C2_MM")).as("Perform_Quantity") ) // Load chi tieu c2_dc val c2_dc = spark.read.format("avro").load("/gold_zone/full_load/kpi_date/c2_dc") .withColumn( "Category", when(col("StoreTypeId") === 17, "C2 không hoạt động") .when(col("StoreTypeId") === 18, "C2 Trọng điểm không hoạt động") .when(col("StoreTypeId") === 23, "C2 Siêu lớn không hoạt động") ).groupBy( "OpenDate", "SaleDepartment", "OrganizationName", "Category", "Team_Name" ).agg( sum(col("So_luong_C2_DC")).as("Perform_Quantity") ) // Load chi tieu khdv: hien tai de null // val khdv = spark.read.format("avro").load("/raw_zone/excel_form/sale_plan/01.1.KHDV.avro") val df_c2_mm_kh = spark.read.format("csv").option("header", "true").option("multiLine", "true") .load("/raw_zone/excel_form/upload/ke_hoach/c2_mm_dc.csv") .filter(col("Ten_chi_tieu") === "c2_mm" and col("Loai_KH") === "Thang") .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M")) .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q")) .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y")) .groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN","Cap_dai_ly" , "Ten_doi") .agg(sum("So_luong").as("So_luong_KH")) // Chia kế hoạch theo từng ngày val my_date = spark.read.format("avro").load("/raw_zone/full_load/date/") val date_of_month = spark.read.format("avro").load("/raw_zone/full_load/date/").groupBy("month", "quarter", "year").agg(count("*").alias("so_ngay")) val count_date_of_month = my_date.as("t1").join(date_of_month.as("t2"), my_date("month") === date_of_month("month") && my_date("year") === date_of_month("year")) .select(col("t1.date"), col("t1.month"), col("t1.quarter"), col("t1.year"), col("t2.so_ngay")) val df_c2_mm_kh_date = df_c2_mm_kh.join(count_date_of_month, df_c2_mm_kh("Month") === count_date_of_month("Month") and df_c2_mm_kh("Year") === count_date_of_month("Year"), "left") .withColumn("So_luong_KH", col("So_luong_KH") /col("so_ngay")) val result_c2mm = c2_mm.join( df_c2_mm_kh_date, coalesce(c2_mm("OpenDate"), lit("")) === coalesce(df_c2_mm_kh_date("date"), lit("")) && coalesce(c2_mm("SaleDepartment"), lit("")) === coalesce(df_c2_mm_kh_date("Don_vi"), lit("")) && coalesce(c2_mm("OrganizationName"), lit("")) === coalesce(df_c2_mm_kh_date("Vung_CN"), lit("")) && coalesce(c2_mm("Team_Name"), lit("")) === coalesce(df_c2_mm_kh_date("Ten_doi"), lit("")) , "full" ).select( coalesce(c2_mm("OpenDate"), df_c2_mm_kh_date.col("date")).alias("Date"), coalesce(c2_mm("SaleDepartment"), df_c2_mm_kh_date.col("Don_vi")).alias("SaleDepartment"), coalesce(c2_mm("OrganizationName"), df_c2_mm_kh_date.col("Vung_CN")).alias("OrganizationName"), coalesce(c2_mm("Category"), df_c2_mm_kh_date.col("Cap_dai_ly")).alias("Category"), coalesce(c2_mm("Team_Name"), df_c2_mm_kh_date.col("Ten_doi")).alias("Team_Name"), c2_mm("Perform_Quantity").alias("Perform_Quantity"), df_c2_mm_kh_date("So_luong_KH").alias("Plan_Quantity") ) val df_c2_dc_kh = spark.read.format("csv").option("header", "true").option("multiLine", "true") .load("/raw_zone/excel_form/upload/ke_hoach/c2_mm_dc.csv") .filter(col("Ten_chi_tieu") === "c2_dc" and col("Loai_KH") === "Thang") .withColumn("Month", date_format(col("Thoi_gian_ke_hoach"), "M")) .withColumn("Quarter", date_format(col("Thoi_gian_ke_hoach"), "q")) .withColumn("Year", date_format(col("Thoi_gian_ke_hoach"), "y")).groupBy("Month", "Quarter", "Year", "Don_vi", "Vung_CN","Cap_dai_ly" , "Ten_doi") .agg(sum("So_luong").as("So_luong_KH")) // Chia kế hoạch theo từng ngày val df_c2_dc_kh_date = df_c2_dc_kh.join(count_date_of_month, df_c2_dc_kh("Month") === count_date_of_month("Month") and df_c2_dc_kh("Year") === count_date_of_month("Year"), "left") .withColumn("So_luong_KH", col("So_luong_KH") /col("so_ngay")) val result_c2dc = c2_dc.join( df_c2_dc_kh_date, coalesce(c2_dc("OpenDate"), lit("")) === coalesce(df_c2_dc_kh_date("date"), lit("")) && coalesce(c2_dc("SaleDepartment"), lit("")) === coalesce(df_c2_dc_kh_date("Don_vi"), lit("")) && coalesce(c2_dc("OrganizationName"), lit("")) === coalesce(df_c2_dc_kh_date("Vung_CN"), lit("")) && coalesce(c2_dc("Team_Name"), lit("")) === coalesce(df_c2_dc_kh_date("Ten_doi"), lit("")) , "full" ).select( coalesce(c2_dc("OpenDate"), df_c2_dc_kh_date.col("date")).alias("Date"), coalesce(c2_dc("SaleDepartment"), df_c2_dc_kh_date.col("Don_vi")).alias("SaleDepartment"), coalesce(c2_dc("OrganizationName"), df_c2_dc_kh_date.col("Vung_CN")).alias("OrganizationName"), coalesce(c2_dc("Category"), df_c2_dc_kh_date.col("Cap_dai_ly")).alias("Category"), coalesce(c2_dc("Team_Name"), df_c2_dc_kh_date.col("Ten_doi")).alias("Team_Name"), c2_dc("Perform_Quantity").alias("Perform_Quantity"), df_c2_dc_kh_date("So_luong_KH").alias("Plan_Quantity") ) val date = result_c2dc.unionByName(result_c2mm) val res = date .filter(col("Date").between("2024-01-01", "2024-12-31") && col("Category").contains("mở mới")) .select( col("Category").alias("Ten_chi_tieu"), lit(null).alias("Thoi_gian_ke_hoach"), col("SaleDepartment").alias("Don_vi"), col("OrganizationName").alias("Vung_CN"), col("Team_Name").alias("Ten_doi"), lit(null).alias("So_luong"), lit(null).alias("Loai_KH") ) val re = res.distinct() val reCast = re .withColumn("Ten_chi_tieu", col("Ten_chi_tieu").cast(StringType)) .withColumn("Thoi_gian_ke_hoach", col("Thoi_gian_ke_hoach").cast(StringType)) .withColumn("Don_vi", col("Don_vi").cast(StringType)) .withColumn("Vung_CN", col("Vung_CN").cast(StringType)) .withColumn("Ten_doi", col("Ten_doi").cast(StringType)) .withColumn("So_luong", col("So_luong").cast(IntegerType)) .withColumn("Loai_KH", col("Loai_KH").cast(StringType)) date.write.format("avro").mode("overwrite").save(outputDir) } }
Editor is loading...
Leave a Comment