您的位置:首页 > 编程语言 > Java开发

Java并发之阻塞队列(二)

2017-01-01 12:45 211 查看
LinkedBlockingQueue
一个由链表组合而成的有界阻塞队列.

1.LinkedBlockingQueue的数据结构



采用的是单链表结构

2.LinkedBlockingQueue源码分析

- 2.1继承关系

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
}


2.2构造方法

public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);//设置容量
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);//初始化节点
}
//将集合放到LinkedBlockingQueue中
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // 打开存储锁
try {
int n = 0;
for (E e : c) {
if (e == null)//出现为null的元素,抛异常
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));//添加节点
++n;//计数器
}
count.set(n);//设置队列中元素的个数
} finally {
putLock.unlock();//释放锁
}
}


2.3内部数据结构

/*队列的容量,如果没有设置则为Integer.MAX_SIZE*/
private final int capacity;
/** 当前元素的个数 */
private final AtomicInteger count = new AtomicInteger(0);
/**头结点*/
private transient Node<E> head;
/**尾节点 */
private transient Node<E> last;
/**获取锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/**获取等待状态*/
private final Condition notEmpty = takeLock.newCondition();
/** 存储锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 存储等待状态*/
private final Condition notFull = putLock.newCondition();
//内部类节点
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
//内部类标志器
private class Itr implements Iterator<E> {
//...
}


2.4put方法

public void put(E e) throws InterruptedException {
if (e == null)//元素不能为空
throw new NullPointerException();
int c = -1;
Node<E> node = new Node(e);//新建一个节点
//获取存储锁
final ReentrantLock putLock = this.putLock;
//队列中元素的数量
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//如果数量等于容量大小,存储操作等待
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);//插入元素
c = count.getAndIncrement();//增加数量
if (c + 1 < capacity)
notFull.signal();//唤醒取操作
} finally {
putLock.unlock();//释放锁
}
if (c == 0)
signalNotEmpty();
}


2.5enqueue方法

private void enqueue(Node<E> node) {
last = last.next = node;//放到尾节点
}


2.6offer方法

public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)//判断大小
return false;
int c = -1;
Node<E> node = new Node(e);//新建节点
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);//插入节点
c = count.getAndIncrement();//增肌数量
if (c + 1 < capacity)
notFull.signal();//唤醒取操作
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}


2.7take方法

public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//队列中没有元素,取操作阻塞
while (count.get() == 0) {
notEmpty.await();
}
//获取元素
x = dequeue();
//数量减一
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();//唤醒取操作
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}


2.8dequeue方法

private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC 移除头结点
head = first;
E x = first.item;
first.item = null;
return x;//将头结点返回
}


2.9 clear方法

public void clear() {
fullyLock();//将取锁和存锁都锁住
try {
//清空队列元素
for (Node<E> p, h = head; (p = h.next) != null; h = p) {
h.next = h;
p.item = null;
}
head = last;
if (count.getAndSet(0) == capacity)
notFull.signal();
} finally {
fullyUnlock();
}
}


3.0toArray方法

public Object[] toArray() {
fullyLock();
try {
int size = count.get();
Object[] a = new Object[size];//新建数组
int k = 0;
//将集合中的元素放到数组中
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = p.item;
return a;
} finally {
fullyUnlock();
}
}


-3.1fullyLock和fullyUnlock

void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 并发