Untitled

 avatar
unknown
plain_text
a year ago
5.4 kB
2
Indexable
void Table::sort() {
    logger.log("Table::sort");

    unordered_map<string, int> colNameToIdx;
    for(int i=0; i<this->columns.size(); i++) {
        colNameToIdx[this->columns[i]] = i;
    }

    // sort phase
    Cursor cursor(this->tableName, 0);
    vector<vector<int>> rows;
    vector<int> row;
    
    int np = (int)ceil(this->blockCount*1.0/BLOCK_COUNT);
    
    for(int pno=0; pno<np; pno++) {
        int startBlock = pno*BLOCK_COUNT;

        rows.clear();
        row.clear();
        if(pno == np-1) {
            row = cursor.getNext();
            while (row.size()) {
                rows.push_back(row);
                row = cursor.getNext();
            }            
        }
        else {
            for(int i=0; i<(maxRowsPerBlock*BLOCK_COUNT); i++) {
                row = cursor.getNext();
                rows.push_back(row);
            }
        }   
        
        // sort rows
        mergeSort(rows, 0, rows.size()-1, parsedQuery.sortColumnNames, colNameToIdx);
        
        // writeback
        int r=0,k=0;
        vector<vector<int>> pageData;
        pageData.clear();
        while(r<rows.size()) {
            if(k==this->maxRowsPerBlock) {
                // write page into memory
                bufferManager.writePage(this->tableName, startBlock, pageData, this->maxRowsPerBlock);
                startBlock++;
                k=0;
                pageData.clear();
            }
            else {
                pageData.push_back(rows[r]);
                k++;
                r++;
            }
        } 

        if(k!=0) {
            bufferManager.writePage(this->tableName, startBlock, pageData, k);
        }
    }
    

    // merge phase

    int passNo = 0;
    int passReq = ceil(logxb(this->blockCount, BLOCK_COUNT));
    int currNumberOfPartitions = np;
    int currPartitionSize = BLOCK_COUNT; // interms of number of block

    while(passNo < passReq) {
        if(passNo == passReq - 1) {
            // last pass
            // write final blocks in place table
        }
        else {
            // intermediate pass
            int n = 0;
            int partitionsToWrite = ceil(currNumberOfPartitions*1.0 / (BLOCK_COUNT-1));
            int startBlock = 0; // used as a pointer to keep track how many block written back.
            while(n<partitionsToWrite) {
                int partitionsToMerge = (BLOCK_COUNT-1);
                if(n==partitionsToWrite-1) {
                    // last partition
                    int rem = currNumberOfPartitions % (BLOCK_COUNT-1);

                    if(rem == 0) {
                        partitionsToMerge = (BLOCK_COUNT-1);
                    }
                    else partitionsToMerge = (rem+1);
                }
                // intermediate partition
                // read k-1 partition from previos pass
                vector<Cursor> cursors;
                vector<int> lastBlocks;

                for(int r=0; r<partitionsToMerge; r++) {
                    string tablename = "";
                    if(passNo == 0) {
                        tablename = this->tableName;
                    }
                    else {
                        tablename = this->tableName + "_" + to_string(passNo-1) + "_";
                    }
                    cursors.push_back(Cursor(tablename , currPartitionSize*(n+r)));
                    lastBlocks.push_back(currPartitionSize*(n+r+1));
                }

                priority_queue<pair<vector<int>, int>, vector<pair<vector<int>, int>>, VectorComparator> minHeap;

                for(int r=0; r<partitionsToMerge; r++) {
                    if(cursors[r].pageIndex < lastBlocks[r]) {
                        vector<int> row = cursors[r].getNext();
                        if(row.size()>0) {
                            minHeap.push({row, r});
                        }
                    }
                }

                while(!minHeap.empty()) {
                    vector<int> row;
                    int idx;

                    // pop first element out from minHeap
                    auto minEle = minHeap.top();
                    row = minEle.first;
                    idx = minEle.second;

                    minHeap.pop();

                    rows.push_back(row);

                    // write back the page if it is fully filled
                    if(rows.size() == this->maxRowsPerBlock) {
                        bufferManager.writePage(this->tableName + "_" + to_string(passNo) + "_", startBlock, rows, this->maxRowsPerBlock);
                        startBlock++;
                        rows.clear();
                    }

                    // put next row into minHeap
                    row = cursors[idx].getNext();
                    if(cursors[idx].pageIndex < lastBlocks[idx]) {
                        minHeap.push({row, idx});
                    }
                }

                // write back remaining rows
                if(rows.size()) {
                    bufferManager.writePage(this->tableName + "_" + to_string(passNo) + "_", startBlock, rows, rows.size());
                    rows.clear();
                }

                n++;
            }

            currNumberOfPartitions = partitionsToWrite;
            currPartitionSize = currPartitionSize*(BLOCK_SIZE - 1); 
            passNo++;

        }
    }
    
}