Untitled
unknown
plain_text
4 years ago
2.9 kB
5
Indexable
val remoteDataDir = "s3a://syntasa-aes-webdata-emr/sitecatalyst-data/aes/eventstores/dev/dev_discounting/HistoryProcessor_RT_NON_NA/history_rt_non_na_tbl/data"
def postFileCompactionProcess(sourceDf: DataFrame, spark: SparkSession): StreamingQuery = {
sourceDf.select(partitionColumn, "batch_persist_time")
.withWatermark("batch_persist_time", "0 seconds")
.dropDuplicates(partitionColumn, "batch_persist_time")
.writeStream
.foreachBatch((d: Dataset[Row], index: Long) => {
try {
val dates = d.map(r => {
r.getDate(0).toString
}).collect()
dates.foreach(date => {
runCompaction(remoteDataDir, partitionColumn, date, spark)
})
if (fs.exists(new Path(remoteDataDir))) {
val deltaTable = DeltaTable.forPath(spark, s"${remoteDataDir}")
deltaTable.vacuum(48)
}
}catch {
case e: Throwable => e.printStackTrace()
}
})
.trigger(Trigger.ProcessingTime(780000))
.start()
}
def runCompaction(remoteDataDir: String,
partitionColumn: String,
partition: String,
activeSparkSession: SparkSession): Unit = {
val hadoopConfiguration = activeSparkSession.sparkContext.hadoopConfiguration
val partitionDecorator = s"${partitionColumn}=${partition}"
val sparkPartitionDecorator = s"${partitionColumn}='${partition}'"
val partitionFQPath = s"${remoteDataDir}${partitionDecorator}/"
if (fs.exists(new Path(partitionFQPath))) {
val files = fs.listStatus(new Path(partitionFQPath))
val oneMbInBytes = 1024.0 * 1024.0
val fileSizeInMb = files.map(f => f.getLen / oneMbInBytes).sum
val numPartitions = Math.max(Math.ceil(fileSizeInMb / 128.0).toInt, 1)
println(s"[${ZonedDateTime.now(ZoneOffset.UTC)}] Total file size [${fileSizeInMb}] date=[${partition}] num partitions = [${numPartitions}]")
val dataDf = activeSparkSession.read
.format("delta")
.load(remoteDataDir)
if (!dataDf.schema.fields.isEmpty) {
dataDf
.where(sparkPartitionDecorator)
.repartition(numPartitions)
.write
.option("dataChange", "false")
.format("delta")
.mode("overwrite")
.option("replaceWhere", sparkPartitionDecorator)
.save(remoteDataDir)
}
}
}
val createTableSql = s"CREATE EXTERNAL TABLE IF NOT EXISTS ${tableName} (${columns.mkString(", ")}) partitioned by (${partitionColumn} string) stored as parquet location 's3a://syntasa-aes-webdata-emr/sitecatalyst-data/aes/eventstores/dev/dev_discounting/HistoryProcessor_RT_NON_NA/history_rt_non_na/data'"
spark.sql(createTableSql)
spark.sql(s"MSCK REPAIR TABLE ${tableName}")Editor is loading...