Untitled
unknown
plain_text
4 years ago
2.9 kB
3
Indexable
val remoteDataDir = "s3a://syntasa-aes-webdata-emr/sitecatalyst-data/aes/eventstores/dev/dev_discounting/HistoryProcessor_RT_NON_NA/history_rt_non_na_tbl/data" def postFileCompactionProcess(sourceDf: DataFrame, spark: SparkSession): StreamingQuery = { sourceDf.select(partitionColumn, "batch_persist_time") .withWatermark("batch_persist_time", "0 seconds") .dropDuplicates(partitionColumn, "batch_persist_time") .writeStream .foreachBatch((d: Dataset[Row], index: Long) => { try { val dates = d.map(r => { r.getDate(0).toString }).collect() dates.foreach(date => { runCompaction(remoteDataDir, partitionColumn, date, spark) }) if (fs.exists(new Path(remoteDataDir))) { val deltaTable = DeltaTable.forPath(spark, s"${remoteDataDir}") deltaTable.vacuum(48) } }catch { case e: Throwable => e.printStackTrace() } }) .trigger(Trigger.ProcessingTime(780000)) .start() } def runCompaction(remoteDataDir: String, partitionColumn: String, partition: String, activeSparkSession: SparkSession): Unit = { val hadoopConfiguration = activeSparkSession.sparkContext.hadoopConfiguration val partitionDecorator = s"${partitionColumn}=${partition}" val sparkPartitionDecorator = s"${partitionColumn}='${partition}'" val partitionFQPath = s"${remoteDataDir}${partitionDecorator}/" if (fs.exists(new Path(partitionFQPath))) { val files = fs.listStatus(new Path(partitionFQPath)) val oneMbInBytes = 1024.0 * 1024.0 val fileSizeInMb = files.map(f => f.getLen / oneMbInBytes).sum val numPartitions = Math.max(Math.ceil(fileSizeInMb / 128.0).toInt, 1) println(s"[${ZonedDateTime.now(ZoneOffset.UTC)}] Total file size [${fileSizeInMb}] date=[${partition}] num partitions = [${numPartitions}]") val dataDf = activeSparkSession.read .format("delta") .load(remoteDataDir) if (!dataDf.schema.fields.isEmpty) { dataDf .where(sparkPartitionDecorator) .repartition(numPartitions) .write .option("dataChange", "false") .format("delta") .mode("overwrite") .option("replaceWhere", sparkPartitionDecorator) .save(remoteDataDir) } } } val createTableSql = s"CREATE EXTERNAL TABLE IF NOT EXISTS ${tableName} (${columns.mkString(", ")}) partitioned by (${partitionColumn} string) stored as parquet location 's3a://syntasa-aes-webdata-emr/sitecatalyst-data/aes/eventstores/dev/dev_discounting/HistoryProcessor_RT_NON_NA/history_rt_non_na/data'" spark.sql(createTableSql) spark.sql(s"MSCK REPAIR TABLE ${tableName}")
Editor is loading...