Java并发之阻塞队列(二)
2017-01-01 12:45
211 查看
LinkedBlockingQueue一个由链表组合而成的有界阻塞队列.
1.LinkedBlockingQueue的数据结构
采用的是单链表结构
2.LinkedBlockingQueue源码分析
- 2.1继承关系
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { }
2.2构造方法
public LinkedBlockingQueue() { this(Integer.MAX_VALUE);//设置容量 } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null);//初始化节点 } //将集合放到LinkedBlockingQueue中 public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // 打开存储锁 try { int n = 0; for (E e : c) { if (e == null)//出现为null的元素,抛异常 throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e));//添加节点 ++n;//计数器 } count.set(n);//设置队列中元素的个数 } finally { putLock.unlock();//释放锁 } }
2.3内部数据结构
/*队列的容量,如果没有设置则为Integer.MAX_SIZE*/ private final int capacity; /** 当前元素的个数 */ private final AtomicInteger count = new AtomicInteger(0); /**头结点*/ private transient Node<E> head; /**尾节点 */ private transient Node<E> last; /**获取锁 */ private final ReentrantLock takeLock = new ReentrantLock(); /**获取等待状态*/ private final Condition notEmpty = takeLock.newCondition(); /** 存储锁 */ private final ReentrantLock putLock = new ReentrantLock(); /** 存储等待状态*/ private final Condition notFull = putLock.newCondition(); //内部类节点 static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } //内部类标志器 private class Itr implements Iterator<E> { //... }
2.4put方法
public void put(E e) throws InterruptedException { if (e == null)//元素不能为空 throw new NullPointerException(); int c = -1; Node<E> node = new Node(e);//新建一个节点 //获取存储锁 final ReentrantLock putLock = this.putLock; //队列中元素的数量 final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { //如果数量等于容量大小,存储操作等待 while (count.get() == capacity) { notFull.await(); } enqueue(node);//插入元素 c = count.getAndIncrement();//增加数量 if (c + 1 < capacity) notFull.signal();//唤醒取操作 } finally { putLock.unlock();//释放锁 } if (c == 0) signalNotEmpty(); }
2.5enqueue方法
private void enqueue(Node<E> node) { last = last.next = node;//放到尾节点 }
2.6offer方法
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity)//判断大小 return false; int c = -1; Node<E> node = new Node(e);//新建节点 final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node);//插入节点 c = count.getAndIncrement();//增肌数量 if (c + 1 < capacity) notFull.signal();//唤醒取操作 } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
2.7take方法
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //队列中没有元素,取操作阻塞 while (count.get() == 0) { notEmpty.await(); } //获取元素 x = dequeue(); //数量减一 c = count.getAndDecrement(); if (c > 1) notEmpty.signal();//唤醒取操作 } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
2.8dequeue方法
private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC 移除头结点 head = first; E x = first.item; first.item = null; return x;//将头结点返回 }
2.9 clear方法
public void clear() { fullyLock();//将取锁和存锁都锁住 try { //清空队列元素 for (Node<E> p, h = head; (p = h.next) != null; h = p) { h.next = h; p.item = null; } head = last; if (count.getAndSet(0) == capacity) notFull.signal(); } finally { fullyUnlock(); } }
3.0toArray方法
public Object[] toArray() { fullyLock(); try { int size = count.get(); Object[] a = new Object[size];//新建数组 int k = 0; //将集合中的元素放到数组中 for (Node<E> p = head.next; p != null; p = p.next) a[k++] = p.item; return a; } finally { fullyUnlock(); } }
-3.1fullyLock和fullyUnlock
void fullyLock() { putLock.lock(); takeLock.lock(); } void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
相关文章推荐
- 线程高级应用-心得7-java5线程并发库中阻塞队列Condition的应用及案例分析
- 【Java并发编程】之二十一:并发新特性—阻塞队列和阻塞栈(含代码)
- CoreJava_线程并发(阻塞队列):在某个文件夹下搜索含有某关键字的文件
- Java并发框架——AQS阻塞队列管理(二)——自旋锁优化
- 聊聊高并发(十四)理解Java中的管程,条件队列,Condition以及实现一个阻塞队列
- Java多线程 阻塞队列和并发集合
- Java多线程 阻塞队列和并发集合
- Java并发编程:阻塞队列
- Java多线程 阻塞队列和并发集合
- Java并发框架——AQS阻塞队列管理(二)——自旋锁优化
- Java多线程 阻塞队列和并发集合
- zz 聊聊并发(七)——Java中的阻塞队列
- Java多线程 阻塞队列和并发集合
- Java并发(基础知识)—— 阻塞队列和生产者消费者模式
- (13)多线程与并发库之java5阻塞队列(BlockingQueue)的应用----子线程循环10次,接着主线程循环100次,接着又回到子线程循环10次,接着再回到主线程循环100次,如此循环50次
- 聊聊并发-Java中的阻塞队列
- Java阻塞队列SynchronousQueue实现并发
- 【Java】并发之阻塞队列
- Java并发框架——AQS阻塞队列管理(一)——自旋锁
- 黑马程序员——Java5中的线程并发库(二)--Semaphore实现信号灯、其他同步函数、可阻塞的队列、同步集合