阻塞队列学习小结
2016-11-29 17:22
260 查看
常见的阻塞队列结构类图
常见的几种阻塞队列有
1) ArrayBlockingQueue
2) PriorityBlockingQueue
3) LinkedBlockingQueue
4) LinkedBlockingDeque
5) SynchronousQueue
6) DelayQueue
7) LinkedTransferQueue
ArrayBlockingQueue
特点:指定大小 、 FIFO 、 锁是没有分离的,即生产和消费用的是同一个锁; 直接将枚举对象插入或移除。ArrayBlockingQueue 是一个由数组支持的有界阻塞队列。遵循FIFO对元素进行排序。固定大小的循环数组,一旦创建,不能改变容量。试图向满队列放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
offer和poll非阻塞的,立即返回或等待指定时间返回 offer(E e, long timeout, TimeUnit unit);put和take会阻塞等待。
可以看到,阻塞队列里面数据量始终不超过3个,如果空或者满,存取线程会相互阻塞
使用两个阻塞队列实现存取两个线程相互等待
PriorityBlockingQueue
无界的阻塞队列,使用与类PriorityQueue相同的排序规则,即自然排序或构造函数Comparator。
因为无界,所以有可能会造成资源耗尽。此外不允许使用null元素,不允许插入不可比较对象。实际上内部采用的是堆排序算法。插入一个新对象,从最后一个元素开始向前比较,叶子节点与根节点比较并进行交换。
iterator() 方法中所提供的迭代器并不保证以特定的顺序遍历 PriorityBlockingQueue 的元素。
如果需要有序地遍历,则应考虑使用 Arrays.sort(pq.toArray())。
LinkedBlockingQueue
与ArrayBlockingQueue类似。默认容量Integer.MAX_VALUE FIFO顺序 锁是分离的,即生产用的是putLock,消费是takeLock 需要把枚举对象转换为Node
LinkedBlockingDeque
提供对双端结点的操作
SynchronousQueue
存取交替完成
DelayQueue
DelayQueue有如下场景
a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
也可以应用于session超时管理,网络应答通讯协议的请求超时处理。
DelayQueue是一个BlockingQueue,其特化的参数是Delayed。
Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。DelayQueue内部是使用PriorityQueue实现的。DelayQueue = BlockingQueue + PriorityQueue + Delayed也就是说 DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。
LinkedTransferQueue
ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁;
LinkedBlockingQueue实现的队列中的锁是分离的,即生产用的是putLock,消费是takeLock
在生产或消费时操作不同
ArrayBlockingQueue实现的队列中在生产和消费的时候,是直接将枚举对象插入或移除的;
LinkedBlockingQueue实现的队列中在生产和消费的时候,需要把枚举对象转换为Node进行插入或移除,会影响性能
队列大小初始化方式不同
ArrayBlockingQueue实现的队列中必须指定队列的大小;
LinkedBlockingQueue实现的队列中可以不指定队列的大小,但是默认是Integer.MAX_VALUE
注意:在使用LinkedBlockingQueue时,若用默认大小且当生产速度大于消费速度时候,有可能会内存溢出
在使用ArrayBlockingQueue和LinkedBlockingQueue分别对1000000个简单字符做入队操作时,LinkedBlockingQueue的消耗是ArrayBlockingQueue消耗的10倍左右
参考《java并发编程核心方法与框架》以及其他网上博客资源(网址忘了),如有侵权,请告知,谢谢。
常见的几种阻塞队列有
1) ArrayBlockingQueue
2) PriorityBlockingQueue
3) LinkedBlockingQueue
4) LinkedBlockingDeque
5) SynchronousQueue
6) DelayQueue
7) LinkedTransferQueue
ArrayBlockingQueue
特点:指定大小 、 FIFO 、 锁是没有分离的,即生产和消费用的是同一个锁; 直接将枚举对象插入或移除。ArrayBlockingQueue 是一个由数组支持的有界阻塞队列。遵循FIFO对元素进行排序。固定大小的循环数组,一旦创建,不能改变容量。试图向满队列放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
offer和poll非阻塞的,立即返回或等待指定时间返回 offer(E e, long timeout, TimeUnit unit);put和take会阻塞等待。
public class BlockTest { public static void main(String[] args) { final BlockingQueue queue = new ArrayBlockingQueue(3); for(int i=0;i<2;i++){ new Thread(){ public void run(){ while(true){ try { Thread.sleep((long)(Math.random()*1000)); System.out.println(Thread.currentThread().getName() + "准备放数据!"); queue.put(1); System.out.println(Thread.currentThread().getName() + "队列目前有" + queue.size() + "个数据"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } new Thread(){ public void run(){ while(true){ try { //将此处的睡眠时间分别改为100和1000,观察运行结果 Thread.sleep(100); System.out.println(Thread.currentThread().getName() + "准备取数据!"); queue.take(); System.out.println(Thread.currentThread().getName() + "已经取走数据,"+ "队列目前有" + queue.size() + "个数据"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } }
可以看到,阻塞队列里面数据量始终不超过3个,如果空或者满,存取线程会相互阻塞
使用两个阻塞队列实现存取两个线程相互等待
public class BlockTest { public static void main(String[] args) { ExecutorService service = Executors.newSingleThreadExecutor(); final Business3 business = new Business3(); service.execute(new Runnable() { public void run() { for (int i = 0; i < 50; i++) { business.sub(); } } }); for (int i = 0; i < 50; i++) { business.main(); } } static class Business3 { BlockingQueue subQueue = new ArrayBlockingQueue(1); BlockingQueue mainQueue = new ArrayBlockingQueue(1); //这里是匿名构造方法,只要new一个对象都会调用这个匿名构造方法,它与静态块不同,静态块只会执行一次, //在类第一次加载到JVM的时候执行 //这里主要是让main线程首先put一个,就有东西可以取,如果不加这个匿名构造方法put一个的话程序就死锁了 { try { mainQueue.put(1); } catch (InterruptedException e) { e.printStackTrace(); } } public void sub(){ try { mainQueue.take(); for(int i=0;i<10;i++){ System.out.println(Thread.currentThread().getName() + " : " + i); } subQueue.put(1); }catch(Exception e){ } } public void main(){ try { subQueue.take(); for(int i=0;i<5;i++){ System.out.println(Thread.currentThread().getName() + " : " + i); } mainQueue.put(1); }catch(Exception e){ } } } }
PriorityBlockingQueue
无界的阻塞队列,使用与类PriorityQueue相同的排序规则,即自然排序或构造函数Comparator。
因为无界,所以有可能会造成资源耗尽。此外不允许使用null元素,不允许插入不可比较对象。实际上内部采用的是堆排序算法。插入一个新对象,从最后一个元素开始向前比较,叶子节点与根节点比较并进行交换。
iterator() 方法中所提供的迭代器并不保证以特定的顺序遍历 PriorityBlockingQueue 的元素。
如果需要有序地遍历,则应考虑使用 Arrays.sort(pq.toArray())。
PriorityBlockingQueue<UserInfo> queue = new PriorityBlockingQueue<>(); queue.add(new UserInfo(12)); queue.add(new UserInfo(52)); queue.add(new UserInfo(22)); queue.add(new UserInfo(32)); queue.add(new UserInfo(21)); System.out.println(queue.poll().getId()); System.out.println(queue.poll().getId()); System.out.println(queue.poll().getId()); System.out.println(queue.poll().getId());
LinkedBlockingQueue
与ArrayBlockingQueue类似。默认容量Integer.MAX_VALUE FIFO顺序 锁是分离的,即生产用的是putLock,消费是takeLock 需要把枚举对象转换为Node
LinkedBlockingDeque
提供对双端结点的操作
public class Client implements Runnable { private LinkedBlockingDeque<String> requestList; public Client(LinkedBlockingDeque requestList){ this.requestList = requestList; } @Override public void run() { for (int i = 0; i < 3; i++) { for (int j = 0; j < 5; j++) { StringBuilder request = new StringBuilder(); request.append(i); request.append(":"); request.append(j); try { requestList.putFirst(request.toString()); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Clint: " + request + " "); } try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("Client: End"); } } public class Test { public static void main(String[] args) throws InterruptedException { LinkedBlockingDeque<String> list = new LinkedBlockingDeque<>(3); Client client = new Client(list); Thread thread = new Thread(client); thread.start(); for(int i=0;i<5;i++){ for(int j=0;j<3;j++){ String request = list.takeLast(); System.out.println("Main:"+request); } TimeUnit.MILLISECONDS.sleep(1000); } System.out.println("Main:End"); } }
SynchronousQueue
存取交替完成
public class MyService { public SynchronousQueue queue = new SynchronousQueue(); static int count = 1; public void putMethod(){ String putString = "any"+count++; try { queue.put(putString); } catch (InterruptedException e) { e.printStackTrace(); } } public String takeMethod(){ try { return queue.take().toString(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } } public class ThreadA extends Thread { private MyService myService; public ThreadA(MyService myService){ this.myService = myService; } @Override public void run() { for(int i=0;i<10;i++){ myService.putMethod(); } } } public class Test { public static void main(String[] args) throws InterruptedException { MyService myService = new MyService(); ThreadA a = new ThreadA(myService); ThreadB b = new ThreadB(myService); a.start(); Thread.sleep(2000); b.start(); } }
DelayQueue
DelayQueue有如下场景
a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
也可以应用于session超时管理,网络应答通讯协议的请求超时处理。
DelayQueue是一个BlockingQueue,其特化的参数是Delayed。
Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。DelayQueue内部是使用PriorityQueue实现的。DelayQueue = BlockingQueue + PriorityQueue + Delayed也就是说 DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。
import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class DelayItem<T> implements Delayed{ private static final long NANO_ORIGIN = System.nanoTime(); final static long now(){ return System.nanoTime()-NANO_ORIGIN; } private static final AtomicLong sequencer = new AtomicLong(0); private final long sequenceNumber; private final long time; private final T item; public DelayItem(T submit, long timeout) { this.time = now() + timeout; this.item = submit; this.sequenceNumber = sequencer.getAndIncrement(); } public T getItem(){ return this.item; } @Override public long getDelay(TimeUnit unit) { long d = unit.convert(time-now(), TimeUnit.NANOSECONDS); return d; } @Override public int compareTo(Delayed o) { if(o == this) return 0; if(o instanceof DelayItem){ DelayItem x = (DelayItem)o; long diff = time-x.time; if(diff < 0) return -1; else if(diff > 0) return 1; else if(sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); } } import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; public class Cache<K,V> { private ConcurrentMap<K,V> cacheObject = new ConcurrentHashMap<K, V>(); private DelayQueue<DelayItem<Pair<K,V>>> q = new DelayQueue<DelayItem<Pair<K,V>>>(); private Thread daemonThread; public Cache(){ Runnable daemonTask = new Runnable() { @Override public void run() { daemonCheck(); } }; daemonThread = new Thread(daemonTask); daemonThread.setDaemon(true); daemonThread.setName("Cache Daemon"); daemonThread.start(); } private void daemonCheck() { for(;;){ DelayItem<Pair<K,V>> delayItem = null; try { delayItem = q.take(); } catch (InterruptedException e) { e.printStackTrace(); } if(delayItem != null){ Pair<K,V> pair = delayItem.getItem(); cacheObject.remove(pair.first,pair.second); } } } public void put(K key, V value, long time, TimeUnit unit){ V oldValue = cacheObject.put(key ,value); if(oldValue != null){ q.remove(key); } long nanoTime = TimeUnit.NANOSECONDS.convert(time, unit); q.put(new DelayItem<Pair<K, V>>(new Pair<K, V>(key, value), nanoTime)); } public V get(K key){ return cacheObject.get(key); } public static void main(String[] args) throws InterruptedException { Cache<Integer, String> cache = new Cache<Integer, String>(); cache.put(1, "aaaa", 3, TimeUnit.SECONDS); Thread.sleep(1000 * 2); { String str = cache.get(1); System.out.println(str); } Thread.sleep(1000 * 2); { String str = cache.get(1); System.out.println(str); } } }
LinkedTransferQueue
ArrayBlockingQueue和LinkedBlockingQueue的区别
队列中锁的实现不同ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁;
LinkedBlockingQueue实现的队列中的锁是分离的,即生产用的是putLock,消费是takeLock
在生产或消费时操作不同
ArrayBlockingQueue实现的队列中在生产和消费的时候,是直接将枚举对象插入或移除的;
LinkedBlockingQueue实现的队列中在生产和消费的时候,需要把枚举对象转换为Node进行插入或移除,会影响性能
队列大小初始化方式不同
ArrayBlockingQueue实现的队列中必须指定队列的大小;
LinkedBlockingQueue实现的队列中可以不指定队列的大小,但是默认是Integer.MAX_VALUE
注意:在使用LinkedBlockingQueue时,若用默认大小且当生产速度大于消费速度时候,有可能会内存溢出
在使用ArrayBlockingQueue和LinkedBlockingQueue分别对1000000个简单字符做入队操作时,LinkedBlockingQueue的消耗是ArrayBlockingQueue消耗的10倍左右
Executors的工厂方法提交线程
ExecutorService service = Executors.newCachedThreadPool(); Producer producer = new Producer("01") Producer producers = new Producer("02") Consumer consumer = new Consumer(); service.submit(producer); Runnable对象 service.submit(producers2); Runnable对象 service.submit(consumer); Runnable对象
参考《java并发编程核心方法与框架》以及其他网上博客资源(网址忘了),如有侵权,请告知,谢谢。
相关文章推荐
- 进程间学习小结(消息队列)
- 关于Java多线程和并发运行的学习(五)——阻塞队列
- [Java并发包学习九]Java中的阻塞队列
- 从零学习JAVA多线程(四):阻塞队列和生产者消费者模式
- 非阻塞队列ConcurrentLinkedQueue之容器初步学习
- 学习java多线程的笔记3-使用BlockingQueue阻塞队列来模拟两个线程之间的通信
- 0038 Java学习笔记-多线程-传统线程间通信、Condition、阻塞队列、《疯狂Java讲义 第三版》进程间通信示例代码存在的一个问题
- RabbitMQ学习小结(二)----工作队列
- java 阻塞队列学习
- 非阻塞队列 普通队列 阻塞队列 学习笔记
- [Java并发包学习九]Java中的阻塞队列
- Redis阻塞队列原理学习
- [Java并发包学习九]Java中的阻塞队列
- [Java并发包学习九]Java中的阻塞队列
- Java核心知识点学习----多线程中的阻塞队列,ArrayBlockingQueue介绍
- 网卡多队列学习小结
- [Java并发包学习九]Java中的阻塞队列
- 多线程学习(九)-可阻塞的队列
- 阻塞队列BlockingQueue 学习
- Java线程学习笔记之BlockingQueue阻塞队列