Untitled
unknown
java
3 years ago
2.4 kB
5
Indexable
public class DelayScheduleMonitor { static class Task { Runnable runnable; long runTime; Task(Runnable runnable, long runTime) { this.runnable = runnable; this.runTime = runTime; } } static class DelayBlockingQueue { PriorityQueue < Task > pq = new PriorityQueue < > ((a, b) - > Long.compare(a.runTime, b.runTime)); void add(Runnable runnable, long delayTime) { long runTime = System.currentTimeMillis() + delayTime; synchronized(pq) { Task newTask = new Task(runnable, runTime); pq.add(newTask); if (pq.peek() == newTask) { pq.notify(); } } } Task take() throws InterruptedException { synchronized(pq) { while (true) { if (pq.isEmpty()) { System.out.println(Thread.currentThread().getName() + " waiting on empty"); pq.wait(); } else { long headTime = pq.peek().runTime; if (headTime <= System.currentTimeMillis()) { return pq.poll(); } else { long waitTime = headTime - System.currentTimeMillis(); System.out.println(Thread.currentThread().getName() + " waiting " + waitTime + " ms"); // await headTime - System.currentTimeMillis() time pq.wait(waitTime); } } } } } } DelayBlockingQueue delayBlockingQueue = new DelayBlockingQueue(); ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(30); void schedule(Runnable runnable, int delayTime) throws InterruptedException { delayBlockingQueue.add(runnable, delayTime); } void start() { executor.submit(() - > { while (true) { try { Task curTask = delayBlockingQueue.take(); executor.submit(curTask.runnable); } catch (InterruptedException ex) { break; } } }); } }
Editor is loading...