开发者

Java中的DelayQueue源码解析

开发者 https://www.devze.com 2023-12-14 10:42 出处:网络 作者: demon7552003
目录介绍数据结构属性方法实现offer,poll,peekput,takeleader作用介绍 一个实现PriorityblockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中
目录
  • 介绍
  • 数据结构
  • 属性
  • 方法实现
    • offer,poll,peek
    • put,take
  • leader作用

    介绍

    一个实现PriorityblockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。

    DelayQueue可以运用在以下应用场景:

    1.缓存系统的设计:可以用DelayQueue保存缓js存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。

    2.定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。

    数据结构

    public interface Delayed extends Comparable<Delayed> {
     
        /**
         * 返回与此对象相关的剩余延迟时间,以给定的时间单位表示
         */
        long getDelay(TimeUnit unit);
    }

     getDelay方法一般用内部存储的事件,减去当前事件,即为剩余延迟事件

    属性

      private final transient ReentrantLock lock = new ReentrantLock();
        private final PriorityQueue<E> q = new PriorityQueue<E>();
     
        /**
        *用于优化内部阻塞通知的线程rpAbDSAI
         */
        private Thread leader = null;
        private final Condition available = lock.newCondition();

    以支持优先级的PriorityQueue无界队列作为一个容器,因为元素都必须实现Delayed接口,可以根据元素的过期时间来对元素进行排列,因此,先过期的元素会在队首,每次从队列里取出来都是最先要过期的元素。

    leader是一个Thread元素,它在offer和take中都有使用,它代表当前获取到锁的消费者线程,

    DelayQueue实现Leader-Folloer pattern

    1、当存在多个take线程时,同时只生效一个,即,lehttp://www.devze.comader线程

    2、当leader存在时,其它的take线程均为follower,其等待是通过condition实现的

    3、当leader不存在时,当前线程即成为leader,在delay之后,将leader角色释放还原

    4、最后如果队列还有内容,且leader空缺,则调用一次condition的signal,唤醒挂起的take线程,其中之一将成为新的leader

    5、最后在finally中释放锁

    方法实现

    offer,poll,peek

        public boolean offer(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                q.offer(e);
                //如果插入元素是第一个元素
                if (q.peek() == e) {
                    //leader设置为null
                    leader = null;
                    //唤醒
                    available.signal();
                }
                return true;
            } finally {
                lock.unlock();
            }
        }
        public boolean offer(E e, long timeout, TimeUnit unit) {
            return offer(e);
        }
        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                E first = q.peek();
                //如果未到期,则返回null,否则删除
                if (first == null || first.getDelay(NANOSECONDS) > 0)
                    return null;
                else
                    return q.poll();
            } finally {
                lock.unlock();
            }
        }
       public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    E first = q.peek();
                    if (first == null) {
                        if (nanos <= 0)
                            return null;
                        else
                            nanos = available.awaitNanos(nanos);
                    } else {
                        long delay = first.getDelay(NANOSECONDS);
                        //到期,则poll
                        if (delay <= 0)
                            return q.poll();
                        if (nanos <= 0)
                            return null;
                        first = null; // don't retain ref while waiting
                        if (nanos < delay || leader != null)//nanos<delay,表示超时剩余时间小于到期时间,
                            nanos = available.awaitNanos(nanos);
                        else {
                            Thread thisThread = Thread.currentThread();
                            //设置当前线程为leader
                            leader = thisThread;
                            try {
                                //等待条件
                                long timeLeft = available.awaitNanos(delay);
                                //剩余超时时间
                                nanos -= delay - timeLeft;
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && q.peek() != null)
                    available.signal();
                lock.unlock();
            }
        }
        public E peek() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return q.peek();
            } finally {
                lock.unlock();
            }
        }

    put,take

    /**
         * Retrieves and removes the head of this queue, waiting if necessary
         * until an element with an expired delay is available on this queue.
         *
         * @return the head of this queue
         * @throws InterruptedException {@inheritDoc}
         */
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            // 获取可中断锁。
            lock.lockInterruptibly();
            try {
                for (;;) {
                    // 从优先级队列中获取队列头元素
                    E first = q.peek();
                    if (first == null)
                        // 无元素,当前线程加入等待队列,并阻塞
                        available.await();
                    else {
                        // 通过getDelay 编程客栈方法获取延迟时间
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            // 延迟时间到期,获取并删除头部元素。
                            return q.poll();
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                // 线程节点进入等待队列 x 纳秒。
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                // leader == null且还存在元素的话,唤醒一个消费线程。
                if (leader == null && q.peek() != null)
                    available.signal();
                lock.unlock();
            }
        }
       public void put(E e) {
            offer(e);
        }

    take()方法逻辑:

    1.获取锁

    2.取出优先级队列q的首元素

    3.如果元素q的队首/队列为空,阻塞

    4.如果元素q的队首(first)不为空,获得这个元素的delay时间值,如果first的延迟delay时间值为0的话,说明该元素已经到了可以使用的时间,调用poll方法弹出该元素,跳出方法

    5.如果first的延迟delay时间值不为0的话,释放元素first的引用,避免内存泄露

    6.循环以上操作,直到return

    leader作用

    如果leader不为null,说明已经有消费者线程拿到js锁,直接阻塞当前线程,如果leader为null,把当前线程赋值给leader,并等待剩余的到期时间,最后释放leader,这里我们想象着我们有个多个消费者线程用take方法去取,如果没有leader!=null的判断,这些线程都会无限循环,直到返回第一个元素,很显然很浪费资源。所以leader的作用是设置一个标记,来避免消费者的无脑竞争。

    到此这篇关于Java中的DelayQueue源码解析的文章就介绍到这了,更多相关DelayQueue源码解析内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

    0

    精彩评论

    暂无评论...
    验证码 换一张
    取 消