BlockingQueue及其各个实现的分析整理
2017-02-22 11:01
295 查看
ThreadPoolExecutor类中提交任务到线程池分配线程执行任务使用到一个队列,而这个队列采用的就是BlockingQueue。BlockingQueue实际上定义了一个接口,在Java.util.concurrent包中给出了这个接口的一些常用实现,这篇我们整理一下。
0. BlockingQueue简介
BlockingQueue是java.util.concurrent包中的接口,扩展了java.util中的的Queue接口。
在Java7的API中,这个接口有11个public方法。
但对于BlockingQueue来说,其本身就是一个就是一个阻塞队列,所以这些操作的方法中,最重要的两个就是put()和take()方法,这也是本篇中重点分析的地方,其它的方法可以参见JavaDoc文档。
BlockingQueue的实现有一个特点,队列元素不接受null值。
BlockingQueue这个接口在JDK中提供了很多具体实现,包括了数组、链表等实现,下面就对这些实现类简要分析下。
1. 数组实现的ArrayBlockingQueue
看下ArrayBlockingQueue的构造方法,一共有三个:
ArrayBlockingQueue(int capacity)
ArrayBlockingQueue(int capacity, boolean fair)
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
我们发现,构造方法中并没有无参的,这意味着队列的容量是没有默认的,在使用的时候需要给出容量值。
后两个构造方法还有fair这个参数。这个fair可谓是似曾相识,其实它就是ReentrantLock对象初始化要用到的那个参数。我们知道ArrayBlockingQueue既然是阻塞队列,那么一定会有阻塞和唤醒,这里的实现用到的是Condition的await()和signal() / signalAll(),而用Condition的前提就是有对应的Lock对象,在ArrayBlockingQueue实现中,take()和put()用的是统一的一个单锁。在ArrayBlockingQueue的某些并发操作方法中,是需要加锁来保证线程安全的,而这就是fair参数的作用。
对于队列“空”和“满”的情况,分别使用了两个Condition对象来维护。
另外,ArrayBlockingQueue类我们直接理解就是数组实现的阻塞队列。没错,其中的数据元素是用Object[]来保存的。对于take()和put()方法,则是分别使用了takeIndex和putIndex这两个索引值来记录存放数据的位置。
如上,是take()方法实现的源码。逻辑很简单,先加锁,然后判断是否队列已空,如条件为真,则阻塞,然后取出队列中的元素。我们看到,阻塞是通过对notEmpty这个Condition对象的await()方法调用来做到的,与此对应,extract()方法中实际上也有一个notFull.signal()的调用。
2. 单向链表实现的LinkedBlockingQueue
LinkedBlockingQueue是JDK中BlockingQueue的有一个主要的实现。按照JavaDoc上所述,LinkedBlockingQueue是一个容量可选的阻塞队列。存在LinkedBlockingQueue()无参的默认构造方法实现,使用Integer.MAX_VALUE作为默认容量。
在LinkedBlockingQueue类的实现中,很重要的一个和ArrayBlockingQueue不同的地方,是对put()和take()分别使用了两个不同的锁,都使用了ReentrantLock实现。而针对“空”和“满”的阻塞条件,也是对这两个所对象分别构建的两个Condition对象(notEmpty和notFull),构成了双锁双条件。此外,LinkedBlockingQueue也为take和put操作分别维护了索引takeIndex和putIndex。两锁或者说队列状态的协调一致其实也是通过两个条件对象的await()和signal()来达成。
此外,对于队列中元素的计数,LinkedBlockingQueue也和ArrayBlockingQueue的实现略有不同,使用了AtomicInteger类对象。
对于put()和take()以及类似的操作,双锁避免了互相影响,一定意义上看,减小了操作的锁粒度,提高了并发性。
但对于其他操作,为了保证线程安全,都是双锁同时锁定。双锁使用要避免死锁问题,这个类实现中是统一定义了fullyLock()和fullyUnlock()的方法,先锁定的后释放,避免死锁发生的可能。
除了用数组和队列不同数据结构对BlockingQueue接口的基本实现外,还有其他几种有特殊功能的实现。
3. DelayQueue
基本特征是容量无界,实现上单锁单条件。
功能特点上,实际上是对优先级队列PriorityQueue类的一个封装。放入队列的元素要满足要求<E extends Delayed>。比较器是时间,因为:
元素需要给出getDelay()方法(实际上是Delayed接口的要求)。
等待第一个元素的线程被设置为leader,后续线程无限期等待,直到leader通知他们。队列中的数据元素超时后,元素可被返回。
4. 同步队列SynchronousQueue
这个类在Executors中使用ThreadPoolExecutor类构造CachedThreadPool的时候被用到了。SynchronousQueue的特点是,读取操作take()和放入操作put()同时完成才会同事解开阻塞。即一个元素只有当其本身被take()的时候put()才会被唤醒。没有容量的概念。
构造方法中可以带fair参数,分为公平和非公平实现,具体的实现分别为队列和栈,顺序不同。具体的代码实现依赖于内部类TransferQueue和TransferStack,逻辑较为复杂,这里不做细节分析。实现中的阻塞机制直接使用LockSupport的park()方法。
5. 顺便说说Exchanger类
这个类也是java.util.concurrent包中的,但和BlockingQueue并无直接层次结构关系。这里提到它主要是因为从用法上来看,相当于一个二项的SynchronousQueue。
具体实现上比较复杂,不做详细分析,记录下几点:
注意到Slot和Node都是AtomicReference,其compareAndSet并不是设置node或者item,而是引用值,巧妙的利用了Node的引用值和item做数据交换
(高并发情况)实现上用了类似concurrentHashMap的segment方式,有插槽Slot的概念
阻塞机制用Locksupport.park()
6. TransferQueue
最后说下TransferQueue这个接口,这个类是java.util.concurrent包中在Java7中增加的,可以看到注释中的“@since 1.7”。和前面的不同,TransferQueue只是一个接口,不是一个实现。在JDK1.7中,有LinkedTransferQueue这样一个实现类。需要注意区分,这个TransferQueue和SynchronousQueue的内部实现类TransferQueue不是同一个类。
这个接口/类实际上是一个比SynchronousQueue更灵活更高级的同步队列,放入新元素可以阻塞也可以非阻塞,并且也可以设定队列的元素容量。
这篇对BlockingQueue的小结就到这里。
0. BlockingQueue简介
BlockingQueue是java.util.concurrent包中的接口,扩展了java.util中的的Queue接口。
在Java7的API中,这个接口有11个public方法。
但对于BlockingQueue来说,其本身就是一个就是一个阻塞队列,所以这些操作的方法中,最重要的两个就是put()和take()方法,这也是本篇中重点分析的地方,其它的方法可以参见JavaDoc文档。
BlockingQueue的实现有一个特点,队列元素不接受null值。
BlockingQueue这个接口在JDK中提供了很多具体实现,包括了数组、链表等实现,下面就对这些实现类简要分析下。
1. 数组实现的ArrayBlockingQueue
看下ArrayBlockingQueue的构造方法,一共有三个:
ArrayBlockingQueue(int capacity)
ArrayBlockingQueue(int capacity, boolean fair)
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
我们发现,构造方法中并没有无参的,这意味着队列的容量是没有默认的,在使用的时候需要给出容量值。
后两个构造方法还有fair这个参数。这个fair可谓是似曾相识,其实它就是ReentrantLock对象初始化要用到的那个参数。我们知道ArrayBlockingQueue既然是阻塞队列,那么一定会有阻塞和唤醒,这里的实现用到的是Condition的await()和signal() / signalAll(),而用Condition的前提就是有对应的Lock对象,在ArrayBlockingQueue实现中,take()和put()用的是统一的一个单锁。在ArrayBlockingQueue的某些并发操作方法中,是需要加锁来保证线程安全的,而这就是fair参数的作用。
对于队列“空”和“满”的情况,分别使用了两个Condition对象来维护。
另外,ArrayBlockingQueue类我们直接理解就是数组实现的阻塞队列。没错,其中的数据元素是用Object[]来保存的。对于take()和put()方法,则是分别使用了takeIndex和putIndex这两个索引值来记录存放数据的位置。
2. 单向链表实现的LinkedBlockingQueue
LinkedBlockingQueue是JDK中BlockingQueue的有一个主要的实现。按照JavaDoc上所述,LinkedBlockingQueue是一个容量可选的阻塞队列。存在LinkedBlockingQueue()无参的默认构造方法实现,使用Integer.MAX_VALUE作为默认容量。
在LinkedBlockingQueue类的实现中,很重要的一个和ArrayBlockingQueue不同的地方,是对put()和take()分别使用了两个不同的锁,都使用了ReentrantLock实现。而针对“空”和“满”的阻塞条件,也是对这两个所对象分别构建的两个Condition对象(notEmpty和notFull),构成了双锁双条件。此外,LinkedBlockingQueue也为take和put操作分别维护了索引takeIndex和putIndex。两锁或者说队列状态的协调一致其实也是通过两个条件对象的await()和signal()来达成。
对于put()和take()以及类似的操作,双锁避免了互相影响,一定意义上看,减小了操作的锁粒度,提高了并发性。
但对于其他操作,为了保证线程安全,都是双锁同时锁定。双锁使用要避免死锁问题,这个类实现中是统一定义了fullyLock()和fullyUnlock()的方法,先锁定的后释放,避免死锁发生的可能。
除了用数组和队列不同数据结构对BlockingQueue接口的基本实现外,还有其他几种有特殊功能的实现。
3. DelayQueue
基本特征是容量无界,实现上单锁单条件。
功能特点上,实际上是对优先级队列PriorityQueue类的一个封装。放入队列的元素要满足要求<E extends Delayed>。比较器是时间,因为:
等待第一个元素的线程被设置为leader,后续线程无限期等待,直到leader通知他们。队列中的数据元素超时后,元素可被返回。
4. 同步队列SynchronousQueue
这个类在Executors中使用ThreadPoolExecutor类构造CachedThreadPool的时候被用到了。SynchronousQueue的特点是,读取操作take()和放入操作put()同时完成才会同事解开阻塞。即一个元素只有当其本身被take()的时候put()才会被唤醒。没有容量的概念。
构造方法中可以带fair参数,分为公平和非公平实现,具体的实现分别为队列和栈,顺序不同。具体的代码实现依赖于内部类TransferQueue和TransferStack,逻辑较为复杂,这里不做细节分析。实现中的阻塞机制直接使用LockSupport的park()方法。
5. 顺便说说Exchanger类
这个类也是java.util.concurrent包中的,但和BlockingQueue并无直接层次结构关系。这里提到它主要是因为从用法上来看,相当于一个二项的SynchronousQueue。
具体实现上比较复杂,不做详细分析,记录下几点:
注意到Slot和Node都是AtomicReference,其compareAndSet并不是设置node或者item,而是引用值,巧妙的利用了Node的引用值和item做数据交换
(高并发情况)实现上用了类似concurrentHashMap的segment方式,有插槽Slot的概念
阻塞机制用Locksupport.park()
6. TransferQueue
最后说下TransferQueue这个接口,这个类是java.util.concurrent包中在Java7中增加的,可以看到注释中的“@since 1.7”。和前面的不同,TransferQueue只是一个接口,不是一个实现。在JDK1.7中,有LinkedTransferQueue这样一个实现类。需要注意区分,这个TransferQueue和SynchronousQueue的内部实现类TransferQueue不是同一个类。
这个接口/类实际上是一个比SynchronousQueue更灵活更高级的同步队列,放入新元素可以阻塞也可以非阻塞,并且也可以设定队列的元素容量。
这篇对BlockingQueue的小结就到这里。
相关文章推荐
- BlockingQueue及其各个实现的分析整理
- 学习笔记之BlockingQueue及其各个实现的分析整理
- concurrent包分析-阻塞队列BlockingQueue及其实现类
- 一个小语言的词法分析程序原理及其实现(1)
- QuickContact分析及其弹出窗口实现
- select 实现分析 –2 【整理】
- 有趣的鼠标位置规律及其分析与实现
- 【改进】BP算法及其C语言实现分析
- Webx小应用的实现整理与分析
- QuickContact分析及其弹出窗口实现
- select 实现分析 –1 【整理】
- epoll源码实现分析[整理]
- js实现的单机双人象棋演示及其分析
- QuickContact分析及其弹出窗口实现
- 一个小语言的词法分析程序原理及其实现(2)
- 素数判断的几种方法代码实现及其复杂度分析
- Java对象池技术的原理及其实现 --Java对象的生命周期分析
- OO系统分析员之路--用例分析系列(6)--用例实现、用例场景和领域模型[整理重发]
- HSUPA技术实现及其应用分析
- Strassen矩阵算法分析及其C++实现 递归分治法(转)