void Table::sort() {
logger.log("Table::sort");
unordered_map<string, int> colNameToIdx;
for(int i=0; i<this->columns.size(); i++) {
colNameToIdx[this->columns[i]] = i;
}
// sort phase
Cursor cursor(this->tableName, 0);
vector<vector<int>> rows;
vector<int> row;
int np = (int)ceil(this->blockCount*1.0/BLOCK_COUNT);
for(int pno=0; pno<np; pno++) {
int startBlock = pno*BLOCK_COUNT;
rows.clear();
row.clear();
if(pno == np-1) {
row = cursor.getNext();
while (row.size()) {
rows.push_back(row);
row = cursor.getNext();
}
}
else {
for(int i=0; i<(maxRowsPerBlock*BLOCK_COUNT); i++) {
row = cursor.getNext();
rows.push_back(row);
}
}
// sort rows
mergeSort(rows, 0, rows.size()-1, parsedQuery.sortColumnNames, colNameToIdx);
// writeback
int r=0,k=0;
vector<vector<int>> pageData;
pageData.clear();
while(r<rows.size()) {
if(k==this->maxRowsPerBlock) {
// write page into memory
bufferManager.writePage(this->tableName, startBlock, pageData, this->maxRowsPerBlock);
startBlock++;
k=0;
pageData.clear();
}
else {
pageData.push_back(rows[r]);
k++;
r++;
}
}
if(k!=0) {
bufferManager.writePage(this->tableName, startBlock, pageData, k);
}
}
// merge phase
int passNo = 0;
int passReq = ceil(logxb(this->blockCount, BLOCK_COUNT));
int currNumberOfPartitions = np;
int currPartitionSize = BLOCK_COUNT; // interms of number of block
while(passNo < passReq) {
if(passNo == passReq - 1) {
// last pass
// write final blocks in place table
}
else {
// intermediate pass
int n = 0;
int partitionsToWrite = ceil(currNumberOfPartitions*1.0 / (BLOCK_COUNT-1));
int startBlock = 0; // used as a pointer to keep track how many block written back.
while(n<partitionsToWrite) {
int partitionsToMerge = (BLOCK_COUNT-1);
if(n==partitionsToWrite-1) {
// last partition
int rem = currNumberOfPartitions % (BLOCK_COUNT-1);
if(rem == 0) {
partitionsToMerge = (BLOCK_COUNT-1);
}
else partitionsToMerge = (rem+1);
}
// intermediate partition
// read k-1 partition from previos pass
vector<Cursor> cursors;
vector<int> lastBlocks;
for(int r=0; r<partitionsToMerge; r++) {
string tablename = "";
if(passNo == 0) {
tablename = this->tableName;
}
else {
tablename = this->tableName + "_" + to_string(passNo-1) + "_";
}
cursors.push_back(Cursor(tablename , currPartitionSize*(n+r)));
lastBlocks.push_back(currPartitionSize*(n+r+1));
}
priority_queue<pair<vector<int>, int>, vector<pair<vector<int>, int>>, VectorComparator> minHeap;
for(int r=0; r<partitionsToMerge; r++) {
if(cursors[r].pageIndex < lastBlocks[r]) {
vector<int> row = cursors[r].getNext();
if(row.size()>0) {
minHeap.push({row, r});
}
}
}
while(!minHeap.empty()) {
vector<int> row;
int idx;
// pop first element out from minHeap
auto minEle = minHeap.top();
row = minEle.first;
idx = minEle.second;
minHeap.pop();
rows.push_back(row);
// write back the page if it is fully filled
if(rows.size() == this->maxRowsPerBlock) {
bufferManager.writePage(this->tableName + "_" + to_string(passNo) + "_", startBlock, rows, this->maxRowsPerBlock);
startBlock++;
rows.clear();
}
// put next row into minHeap
row = cursors[idx].getNext();
if(cursors[idx].pageIndex < lastBlocks[idx]) {
minHeap.push({row, idx});
}
}
// write back remaining rows
if(rows.size()) {
bufferManager.writePage(this->tableName + "_" + to_string(passNo) + "_", startBlock, rows, rows.size());
rows.clear();
}
n++;
}
currNumberOfPartitions = partitionsToWrite;
currPartitionSize = currPartitionSize*(BLOCK_SIZE - 1);
passNo++;
}
}
}