java高并发设计(十)--java安全集合BlockingQueue
2017-01-18 00:00
253 查看
摘要: java安全集合BlockingQueue
接着上个章节我们继续讲解java安全集合中的队列内容,这里只对常用的容器做详细的介绍,其他的有个概念,真正碰到使用场景再好好研究一下,上面提到过java安全队列中的主要实现如下:
ArrayBlockingQueue 数组有界的队列
LinkedBlockingQueue 列表结构的队列
DelayQueue 延迟队列
PriorityBlockingQueue 优先级别的队列
SynchronousQueue 同步队列,容量为1
ArrayBlockingQueue:基于数组的有界阻塞队列,内部实现将对象放入到一个数组中进行操作。并且它是有界的队列,初始化时必须指定大小,后期无法修改,执行的规则是先进先出的规则。
ArrayBlockingQueue的底层实现我们可以看下源码的构造:
LinkedBlockingQueue:内部是以链式结构存储的数据对象,该对象的初始化可以指定边界值,如果没有指定默认是很大的,Integer.MAX_VALUE。我们直接看源码解释:
DelayQueue:对元素进行持有直到一个特定的延迟到期.DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。需要注意的是该延迟的对象是Delayed接口的实现对象。
PriorityBlockingQueue:基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。
SynchronousQueue:一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。
声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:
如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理
接着上个章节我们继续讲解java安全集合中的队列内容,这里只对常用的容器做详细的介绍,其他的有个概念,真正碰到使用场景再好好研究一下,上面提到过java安全队列中的主要实现如下:
ArrayBlockingQueue 数组有界的队列
LinkedBlockingQueue 列表结构的队列
DelayQueue 延迟队列
PriorityBlockingQueue 优先级别的队列
SynchronousQueue 同步队列,容量为1
ArrayBlockingQueue:基于数组的有界阻塞队列,内部实现将对象放入到一个数组中进行操作。并且它是有界的队列,初始化时必须指定大小,后期无法修改,执行的规则是先进先出的规则。
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(20); queue.put("wang"); queue.put("wang1"); queue.put("wang2"); //单次输出永远是wang System.out.println(queue.take());
ArrayBlockingQueue的底层实现我们可以看下源码的构造:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //我主要调几个重要的方法来说明下底层实现,如果想了解的更多,请看下源代码的实现 private static final long serialVersionUID = -817911632652898426L; //内部数据的存储对象,数组 final Object[] items; //外部调用take时取得数组中的下标位置数据 int takeIndex; //外部调用put时放置数据的下标位置数据 int putIndex; //当前容器的实际数据大小 int count; //全局锁 final ReentrantLock lock; //判断是否为空时产生的阻塞 private final Condition notEmpty; //判断是否已经达到边界时的阻塞 private final Condition notFull; final int inc(int i) { return (++i == items.length) ? 0 : i; } final int dec(int i) { return ((i == 0) ? items.length : i) - 1; } //构造函数,必须指定容量 public ArrayBlockingQueue(int capacity) { this(capacity, false); } //实际的构造函数,该函数会初始化内部变量 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } //核心的put操作,验证不能为空,并且使用锁机制,然后验证是否超边界值,然后是插入数据,最后释放锁 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } //insert很简单,只是放置数据,并且对影响的内部变量进行修改 private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); } //核心取数据的操作,主要实现是extract的操作 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); r 4000 eturn extract(); } finally { lock.unlock(); } } private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]); items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; } }
LinkedBlockingQueue:内部是以链式结构存储的数据对象,该对象的初始化可以指定边界值,如果没有指定默认是很大的,Integer.MAX_VALUE。我们直接看源码解释:
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -6903933977591709194L; //链表的数据结构, static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } //容器的边界 private final int capacity; //当前变量的数量 private final AtomicInteger count = new AtomicInteger(0); //链表中的头部数据 private transient Node<E> head; //链表中的尾部数据 private transient Node<E> last; //取数据锁 private final ReentrantLock takeLock = new ReentrantLock(); //取是非空的状态 private final Condition notEmpty = takeLock.newCondition(); //放置数据的锁 private final ReentrantLock putLock = new ReentrantLock(); //放置数据是不能超过边界的状态 private final Condition notFull = putLock.newCondition(); //全局操作非空的安全机制 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } //全局操作非满边界的机制 private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } //构造行数,空参的是默认最大值 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } //当前容器的实际数据量大小 public int size() { return count.get(); } //放置数据的核心实现 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; //初始化当前链表结构的当前节点数据 Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //获得放置锁 putLock.lockInterruptibly(); try { //放置数据前提是判断是否到达边界 while (count.get() == capacity) { notFull.await(); } //放置数据的底层实现 enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } //放置在最后的数据 private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; } //取数据的核心实现 public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //获得取锁 takeLock.lockInterruptibly(); try { //首先要确定容器有数据 while (count.get() == 0) { notEmpty.await(); } //取数据 x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } //取得头部数据的底层实现 private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } }
DelayQueue:对元素进行持有直到一个特定的延迟到期.DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。需要注意的是该延迟的对象是Delayed接口的实现对象。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { private transient final ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); //该容器应用场景较少,后期有时间了再和朋友一起看看 } public interface Delayed extends Comparable<Delayed> { //Comparable的接口是可比较的顶层设计 long getDelay(TimeUnit unit); }
PriorityBlockingQueue:基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。
SynchronousQueue:一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。
声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:
如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -3223113410248163686L; //内部类,主要实现Shared internal API for dual stacks and queues abstract static class Transferer { abstract Object transfer(Object e, boolean timed, long nanos); } //cpu数量 static final int NCPUS = Runtime.getRuntime().availableProcessors(); static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; static final int maxUntimedSpins = maxTimedSpins * 16; static final long spinForTimeoutThreshold = 1000L; //Dual stack static final class TransferStack extends Transferer { //省略内部实现 } /** Dual Queue */ static final class TransferQueue extends Transferer { //省略内部实现 } private transient volatile Transferer transferer; //空参构造,默认是false,非公平锁机制 public SynchronousQueue() { this(false); } //指定类型的锁机制,两种实现方式的截然不同的 public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack(); } //放置对象的设置,主要体现在内部类的初始化上,会根据情况自动获得相应的操作 public void put(E o) throws InterruptedException { if (o == null) throw new NullPointerException(); if (transferer.transfer(o, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } } //取数据的核心实现 public E take() throws InterruptedException { Object e = transferer.transfer(null, false, 0); if (e != null) return (E)e; Thread.interrupted(); throw new InterruptedException(); } }
相关文章推荐
- java高并发设计(三)--安全关键字
- Java——单例设计模式中懒汉式并发访问的安全问题
- Java并发库(十九):同步集合类的使用BlockingQueue、
- java-并发集合-阻塞队列 LinkedBlockingQueue 演示
- 详细分析Java并发集合LinkedBlockingQueue的用法
- Java并发集合——ArrayBlockingQueue ,LinkedBlockingQueue,ConcurrentHashMap
- (14)多线程与并发库之java5同步集合类的应用【包含jdk1.5新特性 ConcurrentHashMap】
- Java实时多任务调度过程中的安全监控设计
- jdk1.4 构建 java多线程,并发设计框架 (四)
- jdk1.4 构建 java多线程,并发设计框架 (五)
- 收集的关于设计模式的java例子集合
- Java实时多任务调度过程中的安全监控设计
- Java实时多任务调度过程中的安全监控设计
- 电子商务的安全分析、设计及JAVA实现
- Java多线程 阻塞队列和并发集合
- Java多线程阻塞队列和并发集合
- Java集合 - BlockingQueue
- jdk1.4 构建 java多线程,并发设计框架 (三)
- java 并发集合
- jdk1.4 构建 java多线程,并发设计框架 使用列子(三)