您的位置:首页 > 产品设计 > UI/UE

java高并发设计(十)--java安全集合BlockingQueue

2017-01-18 00:00 253 查看
摘要: java安全集合BlockingQueue

接着上个章节我们继续讲解java安全集合中的队列内容,这里只对常用的容器做详细的介绍,其他的有个概念,真正碰到使用场景再好好研究一下,上面提到过java安全队列中的主要实现如下:

ArrayBlockingQueue 数组有界的队列

LinkedBlockingQueue 列表结构的队列

DelayQueue 延迟队列

PriorityBlockingQueue 优先级别的队列

SynchronousQueue 同步队列,容量为1

ArrayBlockingQueue:基于数组的有界阻塞队列,内部实现将对象放入到一个数组中进行操作。并且它是有界的队列,初始化时必须指定大小,后期无法修改,执行的规则是先进先出的规则。

ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(20);
queue.put("wang");
queue.put("wang1");
queue.put("wang2");
//单次输出永远是wang
System.out.println(queue.take());

ArrayBlockingQueue的底层实现我们可以看下源码的构造:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//我主要调几个重要的方法来说明下底层实现,如果想了解的更多,请看下源代码的实现

private static final long serialVersionUID = -817911632652898426L;
//内部数据的存储对象,数组
final Object[] items;
//外部调用take时取得数组中的下标位置数据
int takeIndex;
//外部调用put时放置数据的下标位置数据
int putIndex;
//当前容器的实际数据大小
int count;
//全局锁
final ReentrantLock lock;
//判断是否为空时产生的阻塞
private final Condition notEmpty;
//判断是否已经达到边界时的阻塞
private final Condition notFull;

final int inc(int i) {
return (++i == items.length) ? 0 : i;
}

final int dec(int i) {
return ((i == 0) ? items.length : i) - 1;
}
//构造函数,必须指定容量
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
//实际的构造函数,该函数会初始化内部变量
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull =  lock.newCondition();
}
//核心的put操作,验证不能为空,并且使用锁机制,然后验证是否超边界值,然后是插入数据,最后释放锁
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
//insert很简单,只是放置数据,并且对影响的内部变量进行修改
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
//核心取数据的操作,主要实现是extract的操作
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
r
4000
eturn extract();
} finally {
lock.unlock();
}
}
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return x;
}
}

LinkedBlockingQueue:内部是以链式结构存储的数据对象,该对象的初始化可以指定边界值,如果没有指定默认是很大的,Integer.MAX_VALUE。我们直接看源码解释:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -6903933977591709194L;
//链表的数据结构,
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
//容器的边界
private final int capacity;
//当前变量的数量
private final AtomicInteger count = new AtomicInteger(0);
//链表中的头部数据
private transient Node<E> head;
//链表中的尾部数据
private transient Node<E> last;
//取数据锁
private final ReentrantLock takeLock = new ReentrantLock();
//取是非空的状态
private final Condition notEmpty = takeLock.newCondition();
//放置数据的锁
private final ReentrantLock putLock = new ReentrantLock();
//放置数据是不能超过边界的状态
private final Condition notFull = putLock.newCondition();
//全局操作非空的安全机制
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
//全局操作非满边界的机制
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
//构造行数,空参的是默认最大值
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
//当前容器的实际数据量大小
public int size() {
return count.get();
}
//放置数据的核心实现
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
//初始化当前链表结构的当前节点数据
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//获得放置锁
putLock.lockInterruptibly();
try {
//放置数据前提是判断是否到达边界
while (count.get() == capacity) {
notFull.await();
}
//放置数据的底层实现
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
//放置在最后的数据
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
//取数据的核心实现
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//获得取锁
takeLock.lockInterruptibly();
try {
//首先要确定容器有数据
while (count.get() == 0) {
notEmpty.await();
}
//取数据
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
//取得头部数据的底层实现
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
}

DelayQueue:对元素进行持有直到一个特定的延迟到期.DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。需要注意的是该延迟的对象是Delayed接口的实现对象。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {

private transient final ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();

//该容器应用场景较少,后期有时间了再和朋友一起看看

}
public interface Delayed extends Comparable<Delayed> {
//Comparable的接口是可比较的顶层设计
long getDelay(TimeUnit unit);
}

PriorityBlockingQueue:基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。

SynchronousQueue:一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。

声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:
  如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
  但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理

public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -3223113410248163686L;
//内部类,主要实现Shared internal API for dual stacks and queues
abstract static class Transferer {
abstract Object transfer(Object e, boolean timed, long nanos);
}
//cpu数量
static final int NCPUS = Runtime.getRuntime().availableProcessors();
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
static final int maxUntimedSpins = maxTimedSpins * 16;
static final long spinForTimeoutThreshold = 1000L;
//Dual stack
static final class TransferStack extends Transferer {
//省略内部实现
}
/** Dual Queue */
static final class TransferQueue extends Transferer {
//省略内部实现
}
private transient volatile Transferer transferer;
//空参构造,默认是false,非公平锁机制
public SynchronousQueue() {
this(false);
}
//指定类型的锁机制,两种实现方式的截然不同的
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue() : new TransferStack();
}
//放置对象的设置,主要体现在内部类的初始化上,会根据情况自动获得相应的操作
public void put(E o) throws InterruptedException {
if (o == null) throw new NullPointerException();
if (transferer.transfer(o, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
//取数据的核心实现
public E take() throws InterruptedException {
Object e = transferer.transfer(null, false, 0);
if (e != null)
return (E)e;
Thread.interrupted();
throw new InterruptedException();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java安全队列