Untitled
unknown
javascript
2 years ago
5.6 kB
24
Indexable
import { MinHeap } from '@datastructures-js/heap'; import { ValidationError } from "app/helpers/errorHandlers" import cronModel from 'app/models/cronModel' import methodTypeMetadata from 'app/helpers/types/OBSPQMetadata' const cronJobName = 'OBS_PQ_Retry' const heapSize = 16 const maxTries = 5 // one try per second, so 10 seconds trying const maxInterval = 10 const minInterval = 1 /** * Class responsible for managing OBS cooldown time between heavy tasks and execution priority */ class OBSPriorityQueue { constructor(){ this.heapMetadata = Array(heapSize).fill(null) // { methodType, fn } this.heap = new MinHeap() this.gateInfo = { date: null, secondsInterval: 2, tries: 0 } } /** * Insert the respective function metadata position into the heap, fix the heap if not valid * @param {Number} heapMetadataPosition metadata position of the inserted function * @param {Number} priority lower is more important */ insertToHeap(heapMetadataPosition, priority){ this.heap.insert(priority, heapMetadataPosition) if (!this.heap.isValid()){ this.heap.fix() } } /** * Push to the queue a function to be executed according to queue logic based on methodType * @param {String} methodType * @param {Function} fn function to be executed in the future */ pushToQueue(methodType, fn){ if (!methodTypeMetadata.hasOwnProperty(methodType)){ throw new ValidationError(`OBS Priority Queue - specified method type: " ${methodType} " not found on list`) } if (!this.gateInfo.date){ this.gateInfo.date = new Date() } const heapMetadataPosition = this.setHeapMetadata(methodType, fn) const methodMetadata = methodTypeMetadata[methodType] this.insertToHeap(heapMetadataPosition, methodMetadata.priority) this.queueChecker() } /** * Pops the next function to be executed in the queue * @returns Number, the function position in the heapMetadata */ popHeap(){ const root = this.heap.extractRoot() return root.value // returns function position to be executed } /** * Gets the respective function metadata * @param {Number} heapMetadataPosition * @returns Object */ getHeapMetadata(heapMetadataPosition){ return this.heapMetadata[heapMetadataPosition] } /** * Sets the function position in the heapMetadata Array * @param {String} methodType * @param {Function} fn function to be executed * @returns Number, function position in the heapMetadata array */ setHeapMetadata(methodType, fn){ const heapMetadataPosition = this.getAvailableHeapMetadataPosition() this.heapMetadata[heapMetadataPosition] = { methodType, fn: fn } return heapMetadataPosition } /** * Purges the metadata at the respective position * @param {Number} heapMetadataPosition */ removeHeapMetadata(heapMetadataPosition){ this.heapMetadata[heapMetadataPosition] = null } /** * Gets the index of the first free position in the array * @returns Number */ getAvailableHeapMetadataPosition(){ return this.heapMetadata.indexOf(null) } /** * Priority Queue Checker cronjob function */ async queueChecker(){ const isHeapActive = this.heap.size() > 0 // if a heap root exists and cooldown time has passed const nowDate = new Date() if (isHeapActive && this.gateInfo.date.getTime() < nowDate.getTime()){ const heapMetadataPosition = this.popHeap() const heapMetadata = this.getHeapMetadata(heapMetadataPosition) const methodMetadata = methodTypeMetadata[heapMetadata.methodType] // set new gate date let dateToBe = new Date() dateToBe.setSeconds(dateToBe.getSeconds() + methodMetadata.cooldown) this.gateInfo.date = dateToBe this.gateInfo.tries = 0 // 1/3 * cooldown inbetween [1, 10] min/max this.gateInfo.secondsInterval = Math.min(maxInterval, Math.max(minInterval, Math.floor(methodMetadata.cooldown / 3))) // purges info from heapMetadata object this.removeHeapMetadata(heapMetadataPosition) // runs the passed function return await heapMetadata.fn() } else if (isHeapActive && this.gateInfo.tries < maxTries){ this.gateInfo.tries += 1 this.startQueueChecker() } else if (!isHeapActive) { // stop the cronjob from checking cronModel.stopJob(cronJobName) } } /** * Starts the Priority Queue Checker */ startQueueChecker(){ let retryTimeSyntax = `*/${this.gateInfo.secondsInterval} * * * * *` // every {secondsInterval} second let createJob = false const job = cronModel.getByName(cronJobName) // if -1 doesnt exists if (job === -1 || !job.isRunning){ createJob = true } if (createJob){ cronModel.createJob(cronJobName, 'OBS_PQ', retryTimeSyntax, this.queueChecker, null, true, this) } } } const OBSPQ = new OBSPriorityQueue() export default OBSPQ
Editor is loading...