您的位置:首页 > 编程语言 > Java开发

阻塞队列学习小结

2016-11-29 17:22 260 查看
常见的阻塞队列结构类图







常见的几种阻塞队列有

1) ArrayBlockingQueue

2) PriorityBlockingQueue

3) LinkedBlockingQueue

4) LinkedBlockingDeque

5) SynchronousQueue

6) DelayQueue

7) LinkedTransferQueue

ArrayBlockingQueue

​ 特点:指定大小 、 FIFO 、 锁是没有分离的,即生产和消费用的是同一个锁; 直接将枚举对象插入或移除。ArrayBlockingQueue 是一个由数组支持的有界阻塞队列。遵循FIFO对元素进行排序。固定大小的循环数组,一旦创建,不能改变容量。试图向满队列放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。

offer和poll非阻塞的,立即返回或等待指定时间返回 offer(E e, long timeout, TimeUnit unit);put和take会阻塞等待。

public class BlockTest {
public static void main(String[] args) {
final BlockingQueue queue = new ArrayBlockingQueue(3);
for(int i=0;i<2;i++){
new Thread(){
public void run(){
while(true){
try {
Thread.sleep((long)(Math.random()*1000));
System.out.println(Thread.currentThread().getName() + "准备放数据!");
queue.put(1);
System.out.println(Thread.currentThread().getName() +  "队列目前有" + queue.size() + "个数据");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
new Thread(){
public void run(){
while(true){
try {
//将此处的睡眠时间分别改为100和1000,观察运行结果
Thread.sleep(100);
System.out.println(Thread.currentThread().getName() + "准备取数据!");
queue.take();
System.out.println(Thread.currentThread().getName() + "已经取走数据,"+
"队列目前有" + queue.size() + "个数据");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
}


可以看到,阻塞队列里面数据量始终不超过3个,如果空或者满,存取线程会相互阻塞

使用两个阻塞队列实现存取两个线程相互等待

public class BlockTest {
public static void main(String[] args) {
ExecutorService service = Executors.newSingleThreadExecutor();
final Business3 business = new Business3();
service.execute(new Runnable() {
public void run() {
for (int i = 0; i < 50; i++) {
business.sub();
}
}
});
for (int i = 0; i < 50; i++) {
business.main();
}
}
static class Business3 {
BlockingQueue subQueue = new ArrayBlockingQueue(1);
BlockingQueue mainQueue = new ArrayBlockingQueue(1);
//这里是匿名构造方法,只要new一个对象都会调用这个匿名构造方法,它与静态块不同,静态块只会执行一次,
//在类第一次加载到JVM的时候执行
//这里主要是让main线程首先put一个,就有东西可以取,如果不加这个匿名构造方法put一个的话程序就死锁了
{
try {
mainQueue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void sub(){
try
{
mainQueue.take();
for(int i=0;i<10;i++){
System.out.println(Thread.currentThread().getName() + " : " + i);
}
subQueue.put(1);
}catch(Exception e){

}
}

public void main(){
try
{
subQueue.take();
for(int i=0;i<5;i++){
System.out.println(Thread.currentThread().getName() + " : " + i);
}
mainQueue.put(1);
}catch(Exception e){
}
}
}
}


PriorityBlockingQueue

​ 无界的阻塞队列,使用与类PriorityQueue相同的排序规则,即自然排序或构造函数Comparator。

因为无界,所以有可能会造成资源耗尽。此外不允许使用null元素,不允许插入不可比较对象。实际上内部采用的是堆排序算法。插入一个新对象,从最后一个元素开始向前比较,叶子节点与根节点比较并进行交换。

iterator() 方法中所提供的迭代器并不保证以特定的顺序遍历 PriorityBlockingQueue 的元素。

如果需要有序地遍历,则应考虑使用 Arrays.sort(pq.toArray())。

PriorityBlockingQueue<UserInfo> queue = new PriorityBlockingQueue<>();
queue.add(new UserInfo(12));
queue.add(new UserInfo(52));
queue.add(new UserInfo(22));
queue.add(new UserInfo(32));
queue.add(new UserInfo(21));
System.out.println(queue.poll().getId());
System.out.println(queue.poll().getId());
System.out.println(queue.poll().getId());
System.out.println(queue.poll().getId());


LinkedBlockingQueue

​ 与ArrayBlockingQueue类似。默认容量Integer.MAX_VALUE FIFO顺序 锁是分离的,即生产用的是putLock,消费是takeLock 需要把枚举对象转换为Node

LinkedBlockingDeque

提供对双端结点的操作

public class Client implements Runnable {
private LinkedBlockingDeque<String> requestList;
public Client(LinkedBlockingDeque requestList){
this.requestList = requestList;
}

@Override
public void run() {
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 5; j++) {
StringBuilder request = new StringBuilder();
request.append(i);
request.append(":");
request.append(j);
try {
requestList.putFirst(request.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Clint: " + request + " ");
}
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Client: End");
}
}

public class Test {
public static void main(String[] args) throws InterruptedException {
LinkedBlockingDeque<String> list = new LinkedBlockingDeque<>(3);
Client client = new Client(list);
Thread thread = new Thread(client);
thread.start();
for(int i=0;i<5;i++){
for(int j=0;j<3;j++){
String request = list.takeLast();
System.out.println("Main:"+request);
}
TimeUnit.MILLISECONDS.sleep(1000);
}
System.out.println("Main:End");
}
}


SynchronousQueue

存取交替完成

public class MyService {
public SynchronousQueue queue = new SynchronousQueue();
static int count  = 1;
public void putMethod(){
String putString = "any"+count++;
try {
queue.put(putString);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public String takeMethod(){
try {
return queue.take().toString();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}

public class ThreadA extends Thread {
private MyService myService;

public ThreadA(MyService myService){
this.myService = myService;
}

@Override
public void run() {
for(int i=0;i<10;i++){
myService.putMethod();
}
}
}

public class Test {
public static void main(String[] args) throws InterruptedException {
MyService myService = new MyService();
ThreadA a = new ThreadA(myService);
ThreadB b = new ThreadB(myService);
a.start();
Thread.sleep(2000);
b.start();
}
}


DelayQueue

DelayQueue有如下场景

a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。

b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。

c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。

也可以应用于session超时管理,网络应答通讯协议的请求超时处理。

​ DelayQueue是一个BlockingQueue,其特化的参数是Delayed。

Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。DelayQueue内部是使用PriorityQueue实现的。DelayQueue = BlockingQueue + PriorityQueue + Delayed也就是说 DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class DelayItem<T> implements Delayed{
private static final long NANO_ORIGIN = System.nanoTime();

final static long now(){
return System.nanoTime()-NANO_ORIGIN;
}

private static final AtomicLong sequencer = new AtomicLong(0);
private final long sequenceNumber;
private final long time;
private final T item;

public DelayItem(T submit, long timeout) {
this.time = now() + timeout;
this.item = submit;
this.sequenceNumber = sequencer.getAndIncrement();
}

public T getItem(){
return this.item;
}

@Override
public long getDelay(TimeUnit unit) {
long d = unit.convert(time-now(), TimeUnit.NANOSECONDS);
return d;
}

@Override
public int compareTo(Delayed o) {
if(o == this)
return 0;
if(o instanceof DelayItem){
DelayItem x = (DelayItem)o;
long diff = time-x.time;
if(diff < 0)
return -1;
else if(diff > 0)
return 1;
else if(sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}

}

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
public class Cache<K,V> {
private ConcurrentMap<K,V> cacheObject = new ConcurrentHashMap<K, V>();
private DelayQueue<DelayItem<Pair<K,V>>> q = new DelayQueue<DelayItem<Pair<K,V>>>();
private Thread daemonThread;
public Cache(){
Runnable daemonTask = new Runnable() {
@Override
public void run() {
daemonCheck();
}
};
daemonThread = new Thread(daemonTask);
daemonThread.setDaemon(true);
daemonThread.setName("Cache Daemon");
daemonThread.start();
}
private void daemonCheck() {
for(;;){
DelayItem<Pair<K,V>> delayItem = null;
try {
delayItem = q.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
if(delayItem != null){
Pair<K,V> pair = delayItem.getItem();
cacheObject.remove(pair.first,pair.second);
}
}
}
public void put(K key, V value, long time, TimeUnit unit){
V oldValue = cacheObject.put(key ,value);
if(oldValue != null){
q.remove(key);
}
long nanoTime = TimeUnit.NANOSECONDS.convert(time, unit);
q.put(new DelayItem<Pair<K, V>>(new Pair<K, V>(key, value), nanoTime));
}
public V get(K key){
return cacheObject.get(key);
}
public static void main(String[] args) throws InterruptedException {
Cache<Integer, String> cache = new Cache<Integer, String>();
cache.put(1, "aaaa", 3, TimeUnit.SECONDS);
Thread.sleep(1000 * 2);
{
String str = cache.get(1);
System.out.println(str);
}
Thread.sleep(1000 * 2);
{
String str = cache.get(1);
System.out.println(str);
}
}
}


LinkedTransferQueue

ArrayBlockingQueue和LinkedBlockingQueue的区别

队列中锁的实现不同

ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁;

LinkedBlockingQueue实现的队列中的锁是分离的,即生产用的是putLock,消费是takeLock

在生产或消费时操作不同

ArrayBlockingQueue实现的队列中在生产和消费的时候,是直接将枚举对象插入或移除的;

LinkedBlockingQueue实现的队列中在生产和消费的时候,需要把枚举对象转换为Node进行插入或移除,会影响性能

队列大小初始化方式不同

ArrayBlockingQueue实现的队列中必须指定队列的大小;

LinkedBlockingQueue实现的队列中可以不指定队列的大小,但是默认是Integer.MAX_VALUE

注意:在使用LinkedBlockingQueue时,若用默认大小且当生产速度大于消费速度时候,有可能会内存溢出

在使用ArrayBlockingQueue和LinkedBlockingQueue分别对1000000个简单字符做入队操作时,LinkedBlockingQueue的消耗是ArrayBlockingQueue消耗的10倍左右

Executors的工厂方法提交线程

ExecutorService service = Executors.newCachedThreadPool();
Producer producer = new Producer("01")
Producer producers = new Producer("02")
Consumer consumer = new Consumer();
service.submit(producer);  Runnable对象
service.submit(producers2);  Runnable对象
service.submit(consumer);  Runnable对象


参考《java并发编程核心方法与框架》以及其他网上博客资源(网址忘了),如有侵权,请告知,谢谢。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 并发 多线程