Untitled
unknown
plain_text
2 years ago
5.4 kB
4
Indexable
void Table::sort() { logger.log("Table::sort"); unordered_map<string, int> colNameToIdx; for(int i=0; i<this->columns.size(); i++) { colNameToIdx[this->columns[i]] = i; } // sort phase Cursor cursor(this->tableName, 0); vector<vector<int>> rows; vector<int> row; int np = (int)ceil(this->blockCount*1.0/BLOCK_COUNT); for(int pno=0; pno<np; pno++) { int startBlock = pno*BLOCK_COUNT; rows.clear(); row.clear(); if(pno == np-1) { row = cursor.getNext(); while (row.size()) { rows.push_back(row); row = cursor.getNext(); } } else { for(int i=0; i<(maxRowsPerBlock*BLOCK_COUNT); i++) { row = cursor.getNext(); rows.push_back(row); } } // sort rows mergeSort(rows, 0, rows.size()-1, parsedQuery.sortColumnNames, colNameToIdx); // writeback int r=0,k=0; vector<vector<int>> pageData; pageData.clear(); while(r<rows.size()) { if(k==this->maxRowsPerBlock) { // write page into memory bufferManager.writePage(this->tableName, startBlock, pageData, this->maxRowsPerBlock); startBlock++; k=0; pageData.clear(); } else { pageData.push_back(rows[r]); k++; r++; } } if(k!=0) { bufferManager.writePage(this->tableName, startBlock, pageData, k); } } // merge phase int passNo = 0; int passReq = ceil(logxb(this->blockCount, BLOCK_COUNT)); int currNumberOfPartitions = np; int currPartitionSize = BLOCK_COUNT; // interms of number of block while(passNo < passReq) { if(passNo == passReq - 1) { // last pass // write final blocks in place table } else { // intermediate pass int n = 0; int partitionsToWrite = ceil(currNumberOfPartitions*1.0 / (BLOCK_COUNT-1)); int startBlock = 0; // used as a pointer to keep track how many block written back. while(n<partitionsToWrite) { int partitionsToMerge = (BLOCK_COUNT-1); if(n==partitionsToWrite-1) { // last partition int rem = currNumberOfPartitions % (BLOCK_COUNT-1); if(rem == 0) { partitionsToMerge = (BLOCK_COUNT-1); } else partitionsToMerge = (rem+1); } // intermediate partition // read k-1 partition from previos pass vector<Cursor> cursors; vector<int> lastBlocks; for(int r=0; r<partitionsToMerge; r++) { string tablename = ""; if(passNo == 0) { tablename = this->tableName; } else { tablename = this->tableName + "_" + to_string(passNo-1) + "_"; } cursors.push_back(Cursor(tablename , currPartitionSize*(n+r))); lastBlocks.push_back(currPartitionSize*(n+r+1)); } priority_queue<pair<vector<int>, int>, vector<pair<vector<int>, int>>, VectorComparator> minHeap; for(int r=0; r<partitionsToMerge; r++) { if(cursors[r].pageIndex < lastBlocks[r]) { vector<int> row = cursors[r].getNext(); if(row.size()>0) { minHeap.push({row, r}); } } } while(!minHeap.empty()) { vector<int> row; int idx; // pop first element out from minHeap auto minEle = minHeap.top(); row = minEle.first; idx = minEle.second; minHeap.pop(); rows.push_back(row); // write back the page if it is fully filled if(rows.size() == this->maxRowsPerBlock) { bufferManager.writePage(this->tableName + "_" + to_string(passNo) + "_", startBlock, rows, this->maxRowsPerBlock); startBlock++; rows.clear(); } // put next row into minHeap row = cursors[idx].getNext(); if(cursors[idx].pageIndex < lastBlocks[idx]) { minHeap.push({row, idx}); } } // write back remaining rows if(rows.size()) { bufferManager.writePage(this->tableName + "_" + to_string(passNo) + "_", startBlock, rows, rows.size()); rows.clear(); } n++; } currNumberOfPartitions = partitionsToWrite; currPartitionSize = currPartitionSize*(BLOCK_SIZE - 1); passNo++; } } }
Editor is loading...