Untitled
unknown
java
4 years ago
2.4 kB
8
Indexable
public class DelayScheduleMonitor {
static class Task {
Runnable runnable;
long runTime;
Task(Runnable runnable, long runTime) {
this.runnable = runnable;
this.runTime = runTime;
}
}
static class DelayBlockingQueue {
PriorityQueue < Task > pq = new PriorityQueue < > ((a, b) - > Long.compare(a.runTime, b.runTime));
void add(Runnable runnable, long delayTime) {
long runTime = System.currentTimeMillis() + delayTime;
synchronized(pq) {
Task newTask = new Task(runnable, runTime);
pq.add(newTask);
if (pq.peek() == newTask) {
pq.notify();
}
}
}
Task take() throws InterruptedException {
synchronized(pq) {
while (true) {
if (pq.isEmpty()) {
System.out.println(Thread.currentThread().getName() + " waiting on empty");
pq.wait();
} else {
long headTime = pq.peek().runTime;
if (headTime <= System.currentTimeMillis()) {
return pq.poll();
} else {
long waitTime = headTime - System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + " waiting " + waitTime + " ms");
// await headTime - System.currentTimeMillis() time
pq.wait(waitTime);
}
}
}
}
}
}
DelayBlockingQueue delayBlockingQueue = new DelayBlockingQueue();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(30);
void schedule(Runnable runnable, int delayTime) throws InterruptedException {
delayBlockingQueue.add(runnable, delayTime);
}
void start() {
executor.submit(() - > {
while (true) {
try {
Task curTask = delayBlockingQueue.take();
executor.submit(curTask.runnable);
} catch (InterruptedException ex) {
break;
}
}
});
}
}Editor is loading...