Untitled

 avatar
unknown
java
3 years ago
2.4 kB
5
Indexable
public class DelayScheduleMonitor {
    static class Task {
        Runnable runnable;
        long runTime;
        Task(Runnable runnable, long runTime) {
            this.runnable = runnable;
            this.runTime = runTime;
        }
    }
    static class DelayBlockingQueue {
        PriorityQueue < Task > pq = new PriorityQueue < > ((a, b) - > Long.compare(a.runTime, b.runTime));
        void add(Runnable runnable, long delayTime) {
            long runTime = System.currentTimeMillis() + delayTime;
            synchronized(pq) {
                Task newTask = new Task(runnable, runTime);
                pq.add(newTask);
                if (pq.peek() == newTask) {
                    pq.notify();
                }
            }
        }
        Task take() throws InterruptedException {
            synchronized(pq) {
                while (true) {
                    if (pq.isEmpty()) {
                        System.out.println(Thread.currentThread().getName() + " waiting on empty");
                        pq.wait();
                    } else {
                        long headTime = pq.peek().runTime;
                        if (headTime <= System.currentTimeMillis()) {
                            return pq.poll();
                        } else {
                            long waitTime = headTime - System.currentTimeMillis();
                            System.out.println(Thread.currentThread().getName() + " waiting " + waitTime + " ms");
                            // await headTime - System.currentTimeMillis() time 
                            pq.wait(waitTime);
                        }
                    }
                }
            }
        }
    }
    DelayBlockingQueue delayBlockingQueue = new DelayBlockingQueue();
    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(30);
    void schedule(Runnable runnable, int delayTime) throws InterruptedException {
        delayBlockingQueue.add(runnable, delayTime);
    }
    void start() {
        executor.submit(() - > {
            while (true) {
                try {
                    Task curTask = delayBlockingQueue.take();
                    executor.submit(curTask.runnable);
                } catch (InterruptedException ex) {
                    break;
                }
            }
        });
    }
}
Editor is loading...