Untitled

mail@pastecode.io avatar
unknown
plain_text
2 years ago
2.9 kB
1
Indexable
Never
val remoteDataDir = "s3a://syntasa-aes-webdata-emr/sitecatalyst-data/aes/eventstores/dev/dev_discounting/HistoryProcessor_RT_NON_NA/history_rt_non_na_tbl/data"

def postFileCompactionProcess(sourceDf: DataFrame, spark: SparkSession): StreamingQuery = {
    sourceDf.select(partitionColumn, "batch_persist_time")
      .withWatermark("batch_persist_time", "0 seconds")
      .dropDuplicates(partitionColumn, "batch_persist_time")
      .writeStream
      .foreachBatch((d: Dataset[Row], index: Long) => {
        try {
            val dates = d.map(r => {
              r.getDate(0).toString
            }).collect()
        
            dates.foreach(date => {
              runCompaction(remoteDataDir, partitionColumn, date, spark)
            })
            
            if (fs.exists(new Path(remoteDataDir))) {
                val deltaTable = DeltaTable.forPath(spark, s"${remoteDataDir}")
                deltaTable.vacuum(48)
            }
        }catch {
            case e: Throwable => e.printStackTrace()
        }
      })
      .trigger(Trigger.ProcessingTime(780000))
      .start()
}

def runCompaction(remoteDataDir: String,
                partitionColumn: String,
                partition: String,
                activeSparkSession: SparkSession): Unit = {
    val hadoopConfiguration = activeSparkSession.sparkContext.hadoopConfiguration
    
    val partitionDecorator = s"${partitionColumn}=${partition}"
    val sparkPartitionDecorator = s"${partitionColumn}='${partition}'"
    val partitionFQPath = s"${remoteDataDir}${partitionDecorator}/"
    
    if (fs.exists(new Path(partitionFQPath))) {
      val files = fs.listStatus(new Path(partitionFQPath))
    
      val oneMbInBytes = 1024.0 * 1024.0
      val fileSizeInMb = files.map(f => f.getLen / oneMbInBytes).sum
      
      val numPartitions = Math.max(Math.ceil(fileSizeInMb / 128.0).toInt, 1)
      println(s"[${ZonedDateTime.now(ZoneOffset.UTC)}] Total file size [${fileSizeInMb}] date=[${partition}] num partitions = [${numPartitions}]")
    
      val dataDf = activeSparkSession.read
        .format("delta")
        .load(remoteDataDir)
      if (!dataDf.schema.fields.isEmpty) {
        dataDf
          .where(sparkPartitionDecorator)
          .repartition(numPartitions)
          .write
          .option("dataChange", "false")
          .format("delta")
          .mode("overwrite")
          .option("replaceWhere", sparkPartitionDecorator)
          .save(remoteDataDir)
      }
    }
}


val createTableSql = s"CREATE EXTERNAL TABLE IF NOT EXISTS ${tableName} (${columns.mkString(", ")}) partitioned by (${partitionColumn} string) stored as parquet location 's3a://syntasa-aes-webdata-emr/sitecatalyst-data/aes/eventstores/dev/dev_discounting/HistoryProcessor_RT_NON_NA/history_rt_non_na/data'"

spark.sql(createTableSql)
spark.sql(s"MSCK REPAIR TABLE ${tableName}")