Untitled

 avatar
unknown
plain_text
15 days ago
2.7 kB
4
Indexable
package ch.epfl.dias.cs460.rel.early.volcano.late

import ch.epfl.dias.cs460.helpers.builder.skeleton
import ch.epfl.dias.cs460.helpers.builder.skeleton.logical.LogicalFetch
import ch.epfl.dias.cs460.helpers.rel.RelOperator.{LateTuple, NilLateTuple, Tuple}
import ch.epfl.dias.cs460.helpers.rel.late.volcano.naive.Operator
import ch.epfl.dias.cs460.helpers.store.late.LateStandaloneColumnStore
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex.RexNode

import scala.jdk.CollectionConverters.CollectionHasAsScala

class Fetch protected (
                              input: ch.epfl.dias.cs460.helpers.rel.late.volcano.naive.Operator,
                              fetchType: RelDataType,
                              column: LateStandaloneColumnStore,
                              projects: Option[java.util.List[_ <: RexNode]],
                            ) extends skeleton.Fetch[
  ch.epfl.dias.cs460.helpers.rel.late.volcano.naive.Operator
](input, fetchType, column, projects)
  with ch.epfl.dias.cs460.helpers.rel.late.volcano.naive.Operator {

  lazy val evaluator: Tuple => Tuple = eval(projects.get.asScala.toIndexedSeq, fetchType)
  private var it: Iterator[LateTuple] = Iterator.empty

  /**
    * @inheritdoc
    */
  override def open(): Unit = {
    input.open()
    it = input.iterator
  }

  /**
    * @inheritdoc
    */
  override def next(): Option[LateTuple] = {
    println("Fetch next")
//    while (it.hasNext) {
//      val LateTuple(vid, tuple) = it.next()
//      val colEntry = column.getElement(vid)
//      if (colEntry.isDefined) {
//        if (projects.nonEmpty) {
//          val newValue = tuple ++ evaluator(IndexedSeq(colEntry.get))
//          return Some(LateTuple(vid, newValue))
//        }
//        Some(LateTuple(vid, tuple :+ colEntry.get))
//      }
//    }
//    NilLateTuple


    val tuple = input.next()
    if (tuple.isEmpty) {
      None
    } else {
      val colEntry = column.getElement(tuple.get.vid)
      if (colEntry.isEmpty) {
        return next()
      } else {
        if (projects.isEmpty) {
            Some(LateTuple(tuple.get.vid, tuple.get.value :+ colEntry.get))
        } else {
          val newValue = tuple.get.value ++ evaluator(IndexedSeq(colEntry.get))
          Some(LateTuple(tuple.get.vid, newValue))
        }
      }
    }
  }

  /**
    * @inheritdoc
    */
  override def close(): Unit = {
    input.close()
    it = Iterator.empty
  }
}

object Fetch {
  def create(
              input: Operator,
              fetchType: RelDataType,
              column: LateStandaloneColumnStore,
              projects: Option[java.util.List[_ <: RexNode]]
            ): LogicalFetch = {
    new Fetch(input, fetchType, column, projects)
  }
}

Editor is loading...
Leave a Comment