Untitled
scala
a month ago
849 B
3
Indexable
Never
def saveIntermediateStateToScyllaDB ( dataFrame: DataFrame )(implicit sparkSession: SparkSession): Unit = { val prepDataFrame = dataFrame.transform(prepareDataFrameToSave(_)) prepDataFrame.foreachPartition { partition: Iterator[Row] => lazy val scyllaDBSessionHandler = new ScyllaDBSessionHandler partition.foreach ( row => { val featureName = row.getAs[String]("feature_name") val id = row.getAs[String]("id") val stateValue = row.getAs[String]("state_value") val timestamp = row.getAs[Long]("timestamp") val intermediateScyllaDB = new IntermediateStateScyllaDB(featureName, id, scyllaDBSessionHandler.getSession) intermediateScyllaDB.saveState(stateValue) intermediateScyllaDB.saveTimestamp(timestamp) }) scyllaDBSessionHandler.close() } }