Untitled

mail@pastecode.io avatarunknown
scala
a month ago
849 B
3
Indexable
Never
def saveIntermediateStateToScyllaDB
  (
    dataFrame: DataFrame
  )(implicit sparkSession: SparkSession): Unit = {
    val prepDataFrame = dataFrame.transform(prepareDataFrameToSave(_))

    prepDataFrame.foreachPartition { partition: Iterator[Row] =>

      lazy val scyllaDBSessionHandler = new ScyllaDBSessionHandler

      partition.foreach ( row => {
        val featureName = row.getAs[String]("feature_name")
        val id = row.getAs[String]("id")
        val stateValue = row.getAs[String]("state_value")
        val timestamp = row.getAs[Long]("timestamp")

        val intermediateScyllaDB = new IntermediateStateScyllaDB(featureName, id, scyllaDBSessionHandler.getSession)

        intermediateScyllaDB.saveState(stateValue)
        intermediateScyllaDB.saveTimestamp(timestamp)

      })
      scyllaDBSessionHandler.close()
    }
 }