Untitled
unknown
scala
2 years ago
849 B
9
Indexable
def saveIntermediateStateToScyllaDB
(
dataFrame: DataFrame
)(implicit sparkSession: SparkSession): Unit = {
val prepDataFrame = dataFrame.transform(prepareDataFrameToSave(_))
prepDataFrame.foreachPartition { partition: Iterator[Row] =>
lazy val scyllaDBSessionHandler = new ScyllaDBSessionHandler
partition.foreach ( row => {
val featureName = row.getAs[String]("feature_name")
val id = row.getAs[String]("id")
val stateValue = row.getAs[String]("state_value")
val timestamp = row.getAs[Long]("timestamp")
val intermediateScyllaDB = new IntermediateStateScyllaDB(featureName, id, scyllaDBSessionHandler.getSession)
intermediateScyllaDB.saveState(stateValue)
intermediateScyllaDB.saveTimestamp(timestamp)
})
scyllaDBSessionHandler.close()
}
}
Editor is loading...