import { MinHeap } from '@datastructures-js/heap';
import { ValidationError } from "app/helpers/errorHandlers"
import cronModel from 'app/models/cronModel'
import methodTypeMetadata from 'app/helpers/types/OBSPQMetadata'
const cronJobName = 'OBS_PQ_Retry'
const heapSize = 16
const maxTries = 5 // one try per second, so 10 seconds trying
const maxInterval = 10
const minInterval = 1
/**
* Class responsible for managing OBS cooldown time between heavy tasks and execution priority
*/
class OBSPriorityQueue {
constructor(){
this.heapMetadata = Array(heapSize).fill(null) // { methodType, fn }
this.heap = new MinHeap()
this.gateInfo = {
date: null,
secondsInterval: 2,
tries: 0
}
}
/**
* Insert the respective function metadata position into the heap, fix the heap if not valid
* @param {Number} heapMetadataPosition metadata position of the inserted function
* @param {Number} priority lower is more important
*/
insertToHeap(heapMetadataPosition, priority){
this.heap.insert(priority, heapMetadataPosition)
if (!this.heap.isValid()){
this.heap.fix()
}
}
/**
* Push to the queue a function to be executed according to queue logic based on methodType
* @param {String} methodType
* @param {Function} fn function to be executed in the future
*/
pushToQueue(methodType, fn){
if (!methodTypeMetadata.hasOwnProperty(methodType)){
throw new ValidationError(`OBS Priority Queue - specified method type: " ${methodType} " not found on list`)
}
if (!this.gateInfo.date){
this.gateInfo.date = new Date()
}
const heapMetadataPosition = this.setHeapMetadata(methodType, fn)
const methodMetadata = methodTypeMetadata[methodType]
this.insertToHeap(heapMetadataPosition, methodMetadata.priority)
this.queueChecker()
}
/**
* Pops the next function to be executed in the queue
* @returns Number, the function position in the heapMetadata
*/
popHeap(){
const root = this.heap.extractRoot()
return root.value // returns function position to be executed
}
/**
* Gets the respective function metadata
* @param {Number} heapMetadataPosition
* @returns Object
*/
getHeapMetadata(heapMetadataPosition){
return this.heapMetadata[heapMetadataPosition]
}
/**
* Sets the function position in the heapMetadata Array
* @param {String} methodType
* @param {Function} fn function to be executed
* @returns Number, function position in the heapMetadata array
*/
setHeapMetadata(methodType, fn){
const heapMetadataPosition = this.getAvailableHeapMetadataPosition()
this.heapMetadata[heapMetadataPosition] = {
methodType,
fn: fn
}
return heapMetadataPosition
}
/**
* Purges the metadata at the respective position
* @param {Number} heapMetadataPosition
*/
removeHeapMetadata(heapMetadataPosition){
this.heapMetadata[heapMetadataPosition] = null
}
/**
* Gets the index of the first free position in the array
* @returns Number
*/
getAvailableHeapMetadataPosition(){
return this.heapMetadata.indexOf(null)
}
/**
* Priority Queue Checker cronjob function
*/
async queueChecker(){
const isHeapActive = this.heap.size() > 0
// if a heap root exists and cooldown time has passed
const nowDate = new Date()
if (isHeapActive && this.gateInfo.date.getTime() < nowDate.getTime()){
const heapMetadataPosition = this.popHeap()
const heapMetadata = this.getHeapMetadata(heapMetadataPosition)
const methodMetadata = methodTypeMetadata[heapMetadata.methodType]
// set new gate date
let dateToBe = new Date()
dateToBe.setSeconds(dateToBe.getSeconds() + methodMetadata.cooldown)
this.gateInfo.date = dateToBe
this.gateInfo.tries = 0
// 1/3 * cooldown inbetween [1, 10] min/max
this.gateInfo.secondsInterval = Math.min(maxInterval, Math.max(minInterval, Math.floor(methodMetadata.cooldown / 3)))
// purges info from heapMetadata object
this.removeHeapMetadata(heapMetadataPosition)
// runs the passed function
return await heapMetadata.fn()
} else if (isHeapActive && this.gateInfo.tries < maxTries){
this.gateInfo.tries += 1
this.startQueueChecker()
} else if (!isHeapActive) {
// stop the cronjob from checking
cronModel.stopJob(cronJobName)
}
}
/**
* Starts the Priority Queue Checker
*/
startQueueChecker(){
let retryTimeSyntax = `*/${this.gateInfo.secondsInterval} * * * * *` // every {secondsInterval} second
let createJob = false
const job = cronModel.getByName(cronJobName)
// if -1 doesnt exists
if (job === -1 || !job.isRunning){
createJob = true
}
if (createJob){
cronModel.createJob(cronJobName, 'OBS_PQ', retryTimeSyntax, this.queueChecker, null, true, this)
}
}
}
const OBSPQ = new OBSPriorityQueue()
export default OBSPQ