您的位置:首页 > 产品设计 > UI/UE

ArrayBlockingQueue源码分析

2018-03-25 18:01 507 查看
   原文来自:https://blog.csdn.net/xin_jmail/article/details/26157971

ArrayBlockingQueue是一个由数组支持的有界阻塞队列,也是一个循环队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素,队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列检索操作则是从队列头部开始获得元素。
     这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致放入操作受阻塞;试图从空队列中检索元素将导致类似阻塞。

    ArrayBlockingQueue创建的时候需要指定容量capacity(可以存储的最大的元素个数,因为它不会自动扩容)。其中一个构造方法为:

[java] view
plain copy

public ArrayBlockingQueue(int capacity, boolean fair) {  

       if (capacity <= 0)  

           throw new IllegalArgumentException();  

       this.items = (E[]) new Object[capacity];  

       lock = new ReentrantLock(fair);  

       notEmpty = lock.newCondition();  

       notFull =  lock.newCondition();  

   }  

    ArrayBlockingQueue类中定义的变量有:

[java] view
plain copy

/** The queued items  */  

private final E[] items;  

/** items index for next take, poll or remove */  

private int takeIndex;  

/** items index for next put, offer, or add. */  

private int putIndex;  

/** Number of items in the queue */  

private int count;
 

/** Main lock guarding all access */  

private final ReentrantLock lock;  

/** Condition for waiting takes */  

private final Condition notEmpty;  

/** Condition for waiting puts */  

private final Condition notFull;  

使用数组items来存储元素,由于是循环队列,使用takeIndex和putIndex来标记put和take的位置。可以看到,该类中只定义了一个锁ReentrantLock,定义两个Condition对象:notEmputy和notFull,分别用来对take和put操作进行所控制。注:本文主要讲解put()和take()操作,其他方法类似。

put(E e)方法的源码如下。进行put操作之前,必须获得锁并进行加锁操作,以保证线程安全性。加锁后,若发现队列已满,则调用notFull.await()方法,如当前线程陷入等待。直到其他线程take走某个元素后,会调用notFull.signal()方法来激活该线程。激活之后,继续下面的插入操作。

[java] view
plain copy

/** 

     * Inserts the specified element at the tail of this queue, waiting 

     * for space to become available if the queue is full. 

     * 

     */  

    public void put(E e) throws InterruptedException {  

        //不能存放 null  元素  

        if (e == null) throw new NullPointerException();  

        final E[] items = this.items;   //数组队列  

        final ReentrantLock lock = this.lock;  

        //加锁  

        lock.lockInterruptibly();  

        try {  

            try {  

                //当队列满时,调用notFull.await()方法,使该线程阻塞。  

                //直到take掉某个元素后,调用notFull.signal()方法激活该线程。  

                while (count == items.length)  

                    Full.await();  

            } catch (InterruptedException ie) {  

                Full.signal(); // propagate to non-interrupted thread  

                throw ie;  

            }  

            //把元素 e 插入到队尾  

            insert(e);  

        } finally {  

            //解锁  

            lock.unlock();  

        }  

    }  

insert(E e) 方法如下:

[java] view
plain copy

  /** 

   * Inserts element at current put position, advances, and signals. 

   * Call only when holding lock. 

   */  

  private void insert(E x) {  

      items[putIndex] = x;    

//下标加1或者等于0  

      putIndex = inc(putIndex);  

      ++count;  //计数加1  

//若有take()线程陷入阻塞,则该操作激活take()线程,继续进行取元素操作。  

//若没有take()线程陷入阻塞,则该操作无意义。  

      Empty.signal();  

  }  

  

**  

   * Circularly increment i.  

   */  

  final int inc(int i) {  

//此处可以看到使用了循环队列  

      return (++i == items.length)? 0 : i;  

  }  

take()方法代码如下。take操作和put操作相反,故不作详细介绍。

[java] view
plain copy

public E take() throws InterruptedException {  

        final ReentrantLock lock = this.lock;  

        lock.lockInterruptibly();  //加锁  

        try {  

            try {  

                //当队列空时,调用notEmpty.await()方法,使该线程阻塞。  

                //直到take掉某个元素后,调用notEmpty.signal()方法激活该线程。  

                while (count == 0)  

                    Empty.await();  

            } catch (InterruptedException ie) {  

                Empty.signal(); // propagate to non-interrupted thread  

                throw ie;  

            }  

            //取出队头元素  

            E x = extract();  

            return x;  

        } finally {  

            lock.unlock();  //解锁  

        }  

    }  

extract() 方法如下:

[java] view
plain copy

/** 

     * Extracts element at current take position, advances, and signals. 

     * Call only when holding lock. 

     */  

    private E extract() {  

        final E[] items = this.items;  

        E x = items[takeIndex];  

        items[takeIndex] = null;  

        takeIndex = inc(takeIndex);  

        --count;  

        Full.signal();  

        return x;  

    }  

小结:进行put和take操作,共用同一个锁对象。也即是说,put和take无法并行执行!两个Condition来进行通信。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: