Untitled

 avatar
unknown
javascript
2 years ago
5.6 kB
23
Indexable
import { MinHeap } from '@datastructures-js/heap';

import { ValidationError } from "app/helpers/errorHandlers"
import cronModel from 'app/models/cronModel'

import methodTypeMetadata from 'app/helpers/types/OBSPQMetadata'

const cronJobName = 'OBS_PQ_Retry'
const heapSize = 16
const maxTries = 5 // one try per second, so 10 seconds trying
const maxInterval = 10
const minInterval = 1

/**
 * Class responsible for managing OBS cooldown time between heavy tasks and execution priority
 */
 class OBSPriorityQueue {
    constructor(){
        this.heapMetadata = Array(heapSize).fill(null) // { methodType, fn }
        this.heap = new MinHeap()
        this.gateInfo = {
            date: null,
            secondsInterval: 2,
            tries: 0
        }
    }

    /**
     * Insert the respective function metadata position into the heap, fix the heap if not valid
     * @param {Number} heapMetadataPosition metadata position of the inserted function
     * @param {Number} priority lower is more important
     */
    insertToHeap(heapMetadataPosition, priority){
        this.heap.insert(priority, heapMetadataPosition)
        if (!this.heap.isValid()){
            this.heap.fix()
        }
    }

    /**
     * Push to the queue a function to be executed according to queue logic based on methodType
     * @param {String} methodType
     * @param {Function} fn function to be executed in the future
     */
    pushToQueue(methodType, fn){
        if (!methodTypeMetadata.hasOwnProperty(methodType)){
            throw new ValidationError(`OBS Priority Queue - specified method type: " ${methodType} " not found on list`)
        }

        if (!this.gateInfo.date){
            this.gateInfo.date = new Date()
        }

        const heapMetadataPosition = this.setHeapMetadata(methodType, fn)
        const methodMetadata = methodTypeMetadata[methodType]
        this.insertToHeap(heapMetadataPosition, methodMetadata.priority)
        this.queueChecker()
    }

    /**
     * Pops the next function to be executed in the queue
     * @returns Number, the function position in the heapMetadata
     */
    popHeap(){
        const root = this.heap.extractRoot()
        return root.value // returns function position to be executed
    }

    /**
     * Gets the respective function metadata 
     * @param {Number} heapMetadataPosition 
     * @returns Object
     */
    getHeapMetadata(heapMetadataPosition){
        return this.heapMetadata[heapMetadataPosition]
    }

    /**
     * Sets the function position in the heapMetadata Array
     * @param {String} methodType 
     * @param {Function} fn function to be executed
     * @returns Number, function position in the heapMetadata array
     */
    setHeapMetadata(methodType, fn){
        const heapMetadataPosition = this.getAvailableHeapMetadataPosition()
        this.heapMetadata[heapMetadataPosition] = {
            methodType,
            fn: fn
        }
        return heapMetadataPosition
    }

    /**
     * Purges the metadata at the respective position
     * @param {Number} heapMetadataPosition 
     */
    removeHeapMetadata(heapMetadataPosition){
        this.heapMetadata[heapMetadataPosition] = null
    }

    /**
     * Gets the index of the first free position in the array
     * @returns Number
     */
    getAvailableHeapMetadataPosition(){
        return this.heapMetadata.indexOf(null)
    }

    /**
     * Priority Queue Checker cronjob function
     */    
    async queueChecker(){
        const isHeapActive = this.heap.size() > 0

        // if a heap root exists and cooldown time has passed
        const nowDate = new Date()
        if (isHeapActive && this.gateInfo.date.getTime() < nowDate.getTime()){
            const heapMetadataPosition = this.popHeap()
            const heapMetadata = this.getHeapMetadata(heapMetadataPosition)
            const methodMetadata = methodTypeMetadata[heapMetadata.methodType]      
            
            // set new gate date
            let dateToBe = new Date()
            dateToBe.setSeconds(dateToBe.getSeconds() + methodMetadata.cooldown)
            this.gateInfo.date = dateToBe
            this.gateInfo.tries = 0
            // 1/3 * cooldown inbetween [1, 10] min/max
            this.gateInfo.secondsInterval = Math.min(maxInterval, Math.max(minInterval, Math.floor(methodMetadata.cooldown / 3))) 
            // purges info from heapMetadata object
            this.removeHeapMetadata(heapMetadataPosition)

            // runs the passed function
            return await heapMetadata.fn()
        } else if (isHeapActive && this.gateInfo.tries < maxTries){
            this.gateInfo.tries += 1
            this.startQueueChecker()
        } else if (!isHeapActive) {
            // stop the cronjob from checking
            cronModel.stopJob(cronJobName)
        }
    }

    /**
     * Starts the Priority Queue Checker
     */
    startQueueChecker(){
        let retryTimeSyntax = `*/${this.gateInfo.secondsInterval} * * * * *` // every {secondsInterval} second
        let createJob = false
        const job = cronModel.getByName(cronJobName)
        // if -1 doesnt exists
        if (job === -1 || !job.isRunning){
            createJob = true
        }

        if (createJob){
            cronModel.createJob(cronJobName, 'OBS_PQ', retryTimeSyntax, this.queueChecker, null, true, this)
        }
    }


}

const OBSPQ = new OBSPriorityQueue()
export default OBSPQ