Untitled

mail@pastecode.io avatar
unknown
c_cpp
a year ago
16 kB
0
Indexable
Never
//===----------------------------------------------------------------------===//
//
//                         BusTub
//
// disk_extendible_hash_table.cpp
//
// Identification: src/container/disk/hash/disk_extendible_hash_table.cpp
//
// Copyright (c) 2015-2023, Carnegie Mellon University Database Group
//
//===----------------------------------------------------------------------===//

#include <_types/_uint32_t.h>
#include <iostream>
#include <string>
#include <utility>
#include <vector>

#include "common/config.h"
#include "common/exception.h"
#include "common/logger.h"
#include "common/macros.h"
#include "common/rid.h"
#include "common/util/hash_util.h"
#include "container/disk/hash/disk_extendible_hash_table.h"
#include "storage/index/hash_comparator.h"
#include "storage/page/extendible_htable_bucket_page.h"
#include "storage/page/extendible_htable_directory_page.h"
#include "storage/page/extendible_htable_header_page.h"
#include "storage/page/page_guard.h"

namespace bustub {

template <typename K, typename V, typename KC>
DiskExtendibleHashTable<K, V, KC>::DiskExtendibleHashTable(const std::string &name, BufferPoolManager *bpm,
                                                           const KC &cmp, const HashFunction<K> &hash_fn,
                                                           uint32_t header_max_depth, uint32_t directory_max_depth,
                                                           uint32_t bucket_max_size)
    : bpm_(bpm),
      cmp_(cmp),
      hash_fn_(std::move(hash_fn)),
      header_max_depth_(header_max_depth),
      directory_max_depth_(directory_max_depth),
      bucket_max_size_(bucket_max_size) {
      header_page_id_ = INVALID_PAGE_ID;
}

/*****************************************************************************
 * SEARCH
 *****************************************************************************/
template <typename K, typename V, typename KC>
auto DiskExtendibleHashTable<K, V, KC>::GetValue(const K &key, std::vector<V> *result, Transaction *transaction) const
    -> bool {

  /* 
  My thoughts on Search
  1. Use header page to find the corresponding directory page
  2. Use the directory page to find the corresponding bucket (using the global index)
  3. Traverse this possible bucket to get
  */
  if(header_page_id_ == INVALID_PAGE_ID){
    return false;
  }

  std::cout << "Get Value" << std::endl;
  uint32_t hash = Hash(key);
  ReadPageGuard header_page = bpm_->FetchPageRead(header_page_id_);
  
  // Page *header_page = bpm_->FetchPage(header_page_id_);
  // auto *header = reinterpret_cast<ExtendibleHTableHeaderPage *>(header_page->GetData());
  auto header = header_page.As<ExtendibleHTableHeaderPage>();

  std::cout << "Get Value 2" << std::endl;
  
  uint32_t directory_idx = header->HashToDirectoryIndex(hash);
  
  page_id_t directory_page_id = header->GetDirectoryPageId(directory_idx);
  if(directory_page_id == INVALID_PAGE_ID){
    return false;
  }
  std::cout << "Get Value 3" << std::endl;

  ReadPageGuard directory_page = bpm_->FetchPageRead(directory_page_id);
  auto directory = directory_page.As<ExtendibleHTableDirectoryPage>();

  uint32_t bucket_idx = directory->HashToBucketIndex(hash);
  page_id_t bucket_page_id = directory->GetBucketPageId(bucket_idx);
  if(bucket_page_id == INVALID_PAGE_ID){
    return false;
  }
  std::cout << "Get Value 4" << std::endl;
  // Page *bucket_page = bpm_->FetchPage(bucket_page_id);
  // auto *bucket = reinterpret_cast<ExtendibleHTableBucketPage<K, V, KC> *>(bucket_page->GetData());

  ReadPageGuard bucket_page = bpm_->FetchPageRead(bucket_page_id);
  auto bucket = bucket_page.As<ExtendibleHTableBucketPage<K, V, KC>>();

  std::cout << "-----------" << std::endl;

  V value;
  if(!bucket->Lookup(key, value, this->cmp_)){
    std::cout << "Key not found" << std::endl;
    return false;
  }
  result->push_back(value); 

  return true;
}

/*****************************************************************************
 * INSERTION
 *****************************************************************************/

template <typename K, typename V, typename KC>
auto DiskExtendibleHashTable<K, V, KC>::Insert(const K &key, const V &value, Transaction *transaction) -> bool {
  
  /* 
  My thoughts on Insert
  1. Use header page to find the corresponding directory page
  2. Use the directory page to find the corresponding bucket
  3.a If the bucket is not full, then directly insert into that bucket
  3.b.1 If the bucket is full, 
      and if the bucket's local depth = global depth, then double the directory; update the directory mapping; redistribute key-value pairs in that bucket

  3.b.2 If the bucket is full,
      and the bucket's local depth != global depth, then just create a new bucket and redistribute 
  */

  std::cout << "Insert 1" << std::endl;
  std::cout << header_page_id_ << std::endl;

  WritePageGuard header_page;
  bool need_init = false;
  if(header_page_id_ == INVALID_PAGE_ID){
   header_page = bpm_->NewPageGuarded(&header_page_id_).UpgradeWrite();
   need_init = true;
  } else {
    header_page = bpm_->FetchPageWrite(header_page_id_);
  }

  
  
  uint32_t hash = Hash(key);
  // Page *header_page = bpm_->FetchPage(header_page_id_);
  // auto *header = reinterpret_cast<ExtendibleHTableHeaderPage *>(header_page->GetData());
  
  auto header = header_page.AsMut<ExtendibleHTableHeaderPage>();
  if(need_init){
    header->Init(header_max_depth_);
  }

  std::cout << "Insert 2" << std::endl;

  uint32_t directory_idx = header->HashToDirectoryIndex(hash);

  page_id_t directory_page_id = header->GetDirectoryPageId(directory_idx);
  // Page *directory_page;
  WritePageGuard directory_page;
  ExtendibleHTableDirectoryPage *directory;
  if(directory_page_id == INVALID_PAGE_ID){
    directory_page = bpm_->NewPageGuarded(&directory_page_id).UpgradeWrite();
    // directory_page = bpm_->NewPage(&directory_page_id);
    directory = directory_page.AsMut<ExtendibleHTableDirectoryPage>();
    // directory = reinterpret_cast<ExtendibleHTableDirectoryPage *>(directory_page->GetData());
    directory->Init(directory_max_depth_);
    header->SetDirectoryPageId(directory_idx, directory_page_id);
  } else {
    directory_page = bpm_->FetchPageWrite(directory_page_id);
    // directory_page = bpm_->FetchPage(directory_page_id);
    directory = directory_page.AsMut<ExtendibleHTableDirectoryPage>();
    // directory = reinterpret_cast<ExtendibleHTableDirectoryPage *>(directory_page->GetData());
  }

  std::cout << "Insert 3" << std::endl;
  

  // 4 cases: directly insert (stop); split but no double directory; directory grow; throw exception (stop)
  while(true){
    uint32_t bucket_idx = directory->HashToBucketIndex(hash);
    page_id_t bucket_page_id = directory->GetBucketPageId(bucket_idx);
    // Page *bucket_page;
    WritePageGuard bucket_page;
    ExtendibleHTableBucketPage<K, V, KC> *bucket;
    if(bucket_page_id == INVALID_PAGE_ID){
      bucket_page = bpm_->NewPageGuarded(&bucket_page_id).UpgradeWrite();
      // bucket_page = bpm_->NewPage(&bucket_page_id);
      bucket = bucket_page.AsMut<ExtendibleHTableBucketPage<K, V, KC>>();
      // bucket = reinterpret_cast<ExtendibleHTableBucketPage<K, V, KC> *>(bucket_page->GetData());
      bucket->Init(bucket_max_size_);
      directory->SetBucketPageId(bucket_idx, bucket_page_id);
    } else {
      // bucket_page = bpm_->FetchPage(bucket_page_id);
      bucket_page = bpm_->FetchPageWrite(bucket_page_id);
      bucket = bucket_page.AsMut<ExtendibleHTableBucketPage<K, V, KC>>();
      // bucket = reinterpret_cast<ExtendibleHTableBucketPage<K, V, KC> *>(bucket_page->GetData());
    }

    std::cout << "Insert 4" << std::endl;

    if(bucket->Insert(key, value, this->cmp_)){
      std::cout << "size ======== " << bucket->Size() << std::endl;
      for(uint32_t i = 0; i < directory->Size(); i++){
        std::cout << "i = " << i << ", " << directory->GetBucketPageId(i) << " " << directory->GetLocalDepth(i) << std::endl;
      }
      return true;
    }

    V tmp;
    if(bucket->Lookup(key, tmp, this->cmp_)){ // duplicate keys
      return false;
    }

    // full but no need to double the directory
    if(directory->GetLocalDepth(bucket_idx) != directory->GetGlobalDepth()){ 
      page_id_t new_bucket_page_id;

      WritePageGuard new_bucket_page = bpm_->NewPageGuarded(&new_bucket_page_id).UpgradeWrite();
      // Page* new_bucket_page = bpm_->NewPage(&new_bucket_page_id);
      
      ExtendibleHTableBucketPage<K, V, KC> *new_bucket;
      
      new_bucket = new_bucket_page.AsMut<ExtendibleHTableBucketPage<K, V, KC>>();
      // new_bucket = reinterpret_cast<ExtendibleHTableBucketPage<K, V, KC> *>(new_bucket_page->GetData());
      new_bucket->Init(bucket_max_size_);

      // ????????????
      std::vector<bool> stay;
      uint32_t n = bucket->Size();
      auto local_depth = static_cast<uint32_t>(1) << directory->GetLocalDepth(bucket_idx);
      for (uint32_t i = 0; i < n; i++) {
        if((Hash(bucket->KeyAt(i)) & local_depth) == 0){
          stay.push_back(true);
        } else {
          stay.push_back(false);
        }
      }
      bucket->SplitBucket(new_bucket, stay);

      // update local depth???
      auto original_depth = directory->GetLocalDepth(bucket_idx);
      
      directory->SetLocalDepth(bucket_idx, original_depth + 1);
      auto new_bucket_idx = directory->GetSplitImageIndex(bucket_idx);
      directory->SetLocalDepth(new_bucket_idx, original_depth + 1);
      directory->SetBucketPageId(new_bucket_idx, new_bucket_page_id);
    
      continue;
    }

    std::cout << "Insert 5" << std::endl;

    if(directory->GetGlobalDepth() == directory->GetMaxDepth()){
      throw Exception("Reach global max depth");
    }

    // full and need to double the directory
    directory->IncrGlobalDepth();

  }

  std::cout << "Insert 6" << std::endl;

  

}


template <typename K, typename V, typename KC>
auto DiskExtendibleHashTable<K, V, KC>::InsertToNewDirectory(ExtendibleHTableHeaderPage *header, uint32_t directory_idx,
                                                             uint32_t hash, const K &key, const V &value) -> bool {
  
  page_id_t directory_page_id = header->GetDirectoryPageId(directory_idx);
  Page *directory_page = bpm_->FetchPage(directory_page_id);
  auto *directory = reinterpret_cast<ExtendibleHTableDirectoryPage *>(directory_page->GetData());

  uint32_t bucket_idx = directory->HashToBucketIndex(hash);

  bool result = this->InsertToNewBucket(directory, bucket_idx, key, value);

  return result;
}

template <typename K, typename V, typename KC>
auto DiskExtendibleHashTable<K, V, KC>::InsertToNewBucket(ExtendibleHTableDirectoryPage *directory, uint32_t bucket_idx,
                                                          const K &key, const V &value) -> bool {
  
  page_id_t bucket_page_id = directory->GetBucketPageId(bucket_idx);
  Page *bucket_page = bpm_->FetchPage(bucket_page_id);
  auto *bucket = reinterpret_cast<ExtendibleHTableBucketPage<K, V, KC> *>(bucket_page->GetData());
  
  if(!bucket->IsFull()){
    bool result = bucket->Insert(key, value, cmp_);
    return result;
  } 
  
  // full so need to split the bucket!!!!!
  auto new_directory_index = directory->GetSplitImageIndex(bucket_idx);
  std::cout << new_directory_index << std::endl;

  
  
  return true;
}

template <typename K, typename V, typename KC>
void DiskExtendibleHashTable<K, V, KC>::UpdateDirectoryMapping(ExtendibleHTableDirectoryPage *directory,
                                                               uint32_t new_bucket_idx, page_id_t new_bucket_page_id,
                                                               uint32_t new_local_depth, uint32_t local_depth_mask) {
  throw NotImplementedException("DiskExtendibleHashTable is not implemented");
}

/*****************************************************************************
 * REMOVE
 *****************************************************************************/
template <typename K, typename V, typename KC>
auto DiskExtendibleHashTable<K, V, KC>::Remove(const K &key, Transaction *transaction) -> bool {

  /* 
  My thoughts on Remove
  1. Use header page to find the corresponding directory page
  2. Use the directory page to find the corresponding bucket (using the global index)
  3. If after removal, the bucket will be empty, then we need to merge; we need to merge with buckets that have same local depth recursively
  ???
  */

  std::cout << "Remove" << std::endl;

  if(header_page_id_ == INVALID_PAGE_ID){
    return false;
  }

  uint32_t hash = Hash(key);
  WritePageGuard header_page = bpm_->FetchPageWrite(header_page_id_);
  auto header = header_page.AsMut<ExtendibleHTableHeaderPage>();
  // Page *header_page = bpm_->FetchPage(header_page_id_);
  // auto *header = reinterpret_cast<ExtendibleHTableHeaderPage *>(header_page->GetData());
  uint32_t directory_idx = header->HashToDirectoryIndex(hash);

  page_id_t directory_page_id = header->GetDirectoryPageId(directory_idx);
  
  if(directory_page_id == INVALID_PAGE_ID){
    return false;
  }
  WritePageGuard directory_page = bpm_->FetchPageWrite(directory_page_id);
  auto directory = directory_page.AsMut<ExtendibleHTableDirectoryPage>();
  // Page* directory_page = bpm_->FetchPage(directory_page_id);
  // auto *directory = reinterpret_cast<ExtendibleHTableDirectoryPage *>(directory_page->GetData());

  uint32_t bucket_idx = directory->HashToBucketIndex(hash);
  page_id_t bucket_page_id = directory->GetBucketPageId(bucket_idx);
  // Page *bucket_page;
  // ExtendibleHTableBucketPage<K, V, KC> *bucket;
  if(bucket_page_id == INVALID_PAGE_ID){
    return false;
  }   
  WritePageGuard bucket_page = bpm_->FetchPageWrite(bucket_page_id);
  auto bucket = bucket_page.AsMut<ExtendibleHTableBucketPage<K, V, KC>>();   
  // bucket_page = bpm_->FetchPage(bucket_page_id);
  // bucket = reinterpret_cast<ExtendibleHTableBucketPage<K, V, KC> *>(bucket_page->GetData());

  if(!bucket->Remove(key, cmp_)){
    return false;
  }

  while(true){
    if(!bucket->IsEmpty()) { 
      break;
    }

    auto merge_bucket_index = directory->GetSplitImageIndex(bucket_idx);
    
    if(directory->GetLocalDepth(bucket_idx) == 0){
      break;
    }

    std::cout << "-----------------------" << std::endl;
    std::cout << "bucket_idx = " << bucket_idx << std::endl;
    std::cout << "merge_bucket_index = " << merge_bucket_index << std::endl;
    std::cout << "bucket local depth = " << directory->GetLocalDepth(bucket_idx) << std::endl;
    std::cout << "merge bucket local depth = " << directory->GetLocalDepth(merge_bucket_index) << std::endl;
    
    
    if(directory->GetLocalDepth(merge_bucket_index) != directory->GetLocalDepth(bucket_idx)){
      break;
    }

    page_id_t merge_bucket_page_id = directory->GetBucketPageId(merge_bucket_index);
    directory->SetBucketPageId(bucket_idx, merge_bucket_page_id); // update directory

    bucket_page_id = merge_bucket_page_id;
    bucket_page = bpm_->FetchPageWrite(bucket_page_id);
    bucket = bucket_page.AsMut<ExtendibleHTableBucketPage<K, V, KC>>();   
    // bucket_page = bpm_->FetchPage(bucket_page_id);
    // bucket = reinterpret_cast<ExtendibleHTableBucketPage<K, V, KC> *>(bucket_page->GetData());

    directory->DecrLocalDepth(merge_bucket_index);
    directory->DecrLocalDepth(bucket_idx);

  }

  // check whether need to shrink the directory
  while(directory->CanShrink()){
    directory->DecrGlobalDepth();
  }

  return true;
}

template class DiskExtendibleHashTable<int, int, IntComparator>;
template class DiskExtendibleHashTable<GenericKey<4>, RID, GenericComparator<4>>;
template class DiskExtendibleHashTable<GenericKey<8>, RID, GenericComparator<8>>;
template class DiskExtendibleHashTable<GenericKey<16>, RID, GenericComparator<16>>;
template class DiskExtendibleHashTable<GenericKey<32>, RID, GenericComparator<32>>;
template class DiskExtendibleHashTable<GenericKey<64>, RID, GenericComparator<64>>;
}  // namespace bustub