Untitled

mail@pastecode.io avatar
unknown
c_cpp
7 months ago
7.9 kB
5
Indexable
Never
//===----------------------------------------------------------------------===//
//
//                         BusTub
//
// seq_scan_executor.cpp
//
// Identification: src/execution/seq_scan_executor.cpp
//
// Copyright (c) 2015-2021, Carnegie Mellon University Database Group
//
//===----------------------------------------------------------------------===//

#include "execution/executors/seq_scan_executor.h"
// #include <_types/_uint32_t.h>
// #include <sys/_types/_int64_t.h>
#include <cstddef>
#include <memory>
#include <utility>
#include "catalog/column.h"
#include "common/config.h"
#include "concurrency/transaction.h"
#include "concurrency/transaction_manager.h"
#include "storage/table/table_iterator.h"
#include "storage/table/tuple.h"
#include "type/type.h"

namespace bustub {

SeqScanExecutor::SeqScanExecutor(ExecutorContext *exec_ctx, const SeqScanPlanNode *plan)
    : AbstractExecutor(exec_ctx), plan_(plan) {
  table_info_ = exec_ctx->GetCatalog()->GetTable(plan_->GetTableOid());
}

void SeqScanExecutor::Init() {
  /*
  need to get the table heap iterator, then iterate over the table one by one.
  When return from next, need to set the rid and the tuple
  */

  // unique pointer to the table heap
  // auto table_heap_ = table_info_->table_;
  // std::cout << "IN INIT ~~~~~~~~~~~~~" << std::endl;

  table_iterator_ = std::make_unique<TableIterator>(table_info_->table_->MakeIterator());
}

auto SeqScanExecutor::Next(Tuple *tuple, RID *rid) -> bool {
  // std::cout << "IN NEXT ==========" << std::endl;

  while (!table_iterator_->IsEnd()) {
    auto tmp_tuple_pair = table_iterator_->GetTuple();
    *rid = table_iterator_->GetRID();
    ++*table_iterator_;

    // skip tuples that don't satisfy the predicate
    if (plan_->filter_predicate_ != nullptr &&
        !plan_->filter_predicate_->Evaluate(&tmp_tuple_pair.second, table_info_->schema_).GetAs<bool>()) {
      continue;
    }

    // std::cout << "=== tuple = " << tmp_tuple_pair.second.ToString(&plan_->OutputSchema()) << std::endl;

    // new
    if (IsMostRecentVersion(tmp_tuple_pair.first.ts_)) {  // case 1: tuple in the heap is latest
      *tuple = tmp_tuple_pair.second;
      std::cout << "case 1" << std::endl;
      if (tmp_tuple_pair.first.is_deleted_) {
        continue;
      }
      return true;
    }
    if (IsTemporaryModify(tmp_tuple_pair.first.ts_)) {  // case 2
      // *tuple = RecoverPastVersion(tmp_tuple_pair.first.ts_, *rid);
      std::cout << "case 2" << std::endl;
      if (tmp_tuple_pair.first.is_deleted_) {
        continue;
      }
      *tuple = tmp_tuple_pair.second;

      return true;
    }

    auto p = IterateVersionChain(tmp_tuple_pair, *rid);
    if (p.second) {
      *tuple = p.first;
      std::cout << "case 3" << std::endl;
      // std::cout << p.first.ToString(&plan_->OutputSchema());
      return true;
    }

    std::cout << "case 4" << std::endl;
  }

  std::cout << "return false ~~~~~~~~~~" << std::endl;
  return false;
}

auto SeqScanExecutor::IsMostRecentVersion(const timestamp_t &timestamp) -> bool {
  // std::cout << "heap current timestamp = " << static_cast<int64_t>(timestamp) << std::endl;
  std::cout << "transaction read timestamp = " << exec_ctx_->GetTransaction()->GetReadTs() << std::endl;
  return timestamp <= exec_ctx_->GetTransaction()->GetReadTs();
}

auto SeqScanExecutor::IsTemporaryModify(const timestamp_t &timestamp) -> bool {
  // auto mask = 0x8000000000000000;
  // std::cout << "***************" << std::endl;
  // std::cout << exec_ctx_->GetTransaction()->GetTransactionTempTs() << std::endl;
  // std::cout << exec_ctx_->GetTransaction()->GetReadTs() << std::endl;
  // std::cout << timestamp << std::endl;
  // std::cout << "&&&&&&&&&&&" << std::endl;
  return (exec_ctx_->GetTransaction()->GetTransactionTempTs() == timestamp);
  // return (timestamp & mask) != 0;
}

auto SeqScanExecutor::RecoverPastVersion(const timestamp_t &timestamp, RID rid) -> Tuple {
  TransactionManager *txn_manager = exec_ctx_->GetTransactionManager();
  auto undo_link = txn_manager->GetUndoLink(rid);

  if (undo_link.has_value()) {
    auto undo_link_value = undo_link.value();
    auto undo_log = txn_manager->GetUndoLog(undo_link_value);
    Tuple new_tuple = undo_log.tuple_;
    return new_tuple;
  }

  return Tuple{};
}

auto SeqScanExecutor::IterateVersionChain(const std::pair<TupleMeta, Tuple> &p, RID rid) -> std::pair<Tuple, bool> {
  Tuple base_tuple = p.second;
  std::vector<Value> values;
  for (uint32_t i = 0; i < plan_->OutputSchema().GetColumnCount(); i++) {
    values.push_back(base_tuple.GetValue(&plan_->OutputSchema(), i));
  }

  bool deleted = p.first.is_deleted_;
  auto undo_link_optional = exec_ctx_->GetTransactionManager()->GetUndoLink(rid);
  std::cout << "rid in seq scan = " << rid << std::endl;
  if (undo_link_optional.has_value()) {
    std::cout << "undo_link_optional has value" << std::endl;
    auto undo_link = undo_link_optional.value();
    std::cout << undo_link.prev_log_idx_ << ", " << undo_link.prev_txn_ << std::endl;
    Tuple latest_tuple;
    bool found = false;
    while (undo_link.IsValid()) {
      auto undo_log_optional = exec_ctx_->GetTransactionManager()->GetUndoLogOptional(undo_link);
      std::cout << "before check undo_log_optional is optional" << std::endl;
      if (undo_log_optional.has_value()) {
        std::cout << "undo log optional has value" << std::endl;
        auto undo_log = undo_log_optional.value();
        deleted = undo_log.is_deleted_;

        std::cout << "[[[[[[[]]]]]]]] undo_log.ts = " << undo_log.ts_ << std::endl;

        int index = 0;
        auto modified = undo_log.modified_fields_;
        auto modified_tuple_data = undo_log.tuple_;
        int count_modified = 0;
        std::vector<Column> partial_column;
        for (uint32_t i = 0; i < modified.size(); i++) {
          if (modified[i]) {
            count_modified++;
            partial_column.push_back(plan_->OutputSchema().GetColumn(i));
          }
        }

        Schema partial_schema(partial_column);
        std::vector<Value> modified_values(count_modified);
        for (int i = 0; i < count_modified; i++) {
          modified_values[i] = modified_tuple_data.GetValue(&partial_schema, i);
        }

        // std::cout << "[[[[[[[]]]]]]] undo_log.tuple = " << undo_log.tuple_.ToString(&partial_schema) << std::endl;

        for (uint32_t i = 0; i < modified.size(); i++) {
          if (modified[i]) {
            values[i] = modified_values[index];
            index++;
          }
        }

        undo_link = undo_log.prev_version_;

        // if(exec_ctx_->GetTransaction()->GetReadTs() <= undo_log.ts_){
        if (undo_log.ts_ <= exec_ctx_->GetTransaction()->GetReadTs()) {
          // std::cout << "txn read ts = " << exec_ctx_->GetTransaction()->GetReadTs() << std::endl;
          // std::cout << "undo_log.ts = " << undo_log.ts_ << std::endl;

          found = true;  // ???

          break;
        }

      } else {
        std::cout << "false 1" << std::endl;
        return std::make_pair(Tuple{}, false);
      }
    }

    if (deleted) {
      std::cout << "false 2" << std::endl;
      return std::make_pair(Tuple{}, false);
    }

    // std::cout << "found = " << found << std::endl;

    if (!found) {
      // std::cout << "not found at all" << std::endl;
      std::cout << "false 3" << std::endl;
      return std::make_pair(Tuple{}, false);
    }

    Tuple new_tuple(values, &plan_->OutputSchema());
    return std::make_pair(new_tuple, true);
  }

  std::cout << "false 4" << std::endl;
  return std::make_pair(Tuple{}, false);
}

}  // namespace bustub
Leave a Comment