您的位置:首页 > 产品设计 > UI/UE

java blockingqueue 单独用, 结合线程池用

2016-07-10 10:32 387 查看
LinkedBlockingQueue 顾名思义就是link 链表实现,queue就是队列,入队出队,blocking不用管,

所以他就是一个队列,把他当成queue用就行。over


单独用代码:

package ThreadPoolTest;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;

/**
*
* @category 阻塞队列BlockQueue,共四种, 本文说的是单独的 blockQueue 用于生产者消费者问题,
* ArrayBlockingQueue
* LinkedBlockingQueue
* PriorityBlockingQueue
* SynchronousQueue
* @author jaloli
*
* result
*  Producer: producer:0
Producer: producer:1
Producer: producer:3
Producer: producer:2
Consumer:producer:3
Consumer:producer:2
Consumer:producer:0
Consumer:producer:1
Producer: producer:4
Consumer:producer:4
由result 的system.out可看出这些输出不具有参考性, LinkedBlockingDeque的固定容量2,在结果中无法体现,所以多线程中BlockingQueue结合线程池用比较好
*
*/

public class BlockQueueTest {

public static void main(String[] args) throws InterruptedException {
//可以看出放进去一个, bq 中有一个
BlockingQueue<String> bq = new LinkedBlockingQueue<String>(2);//这个参数是blockingqueue 固定的容量
//		bq.put(String.valueOf(1));
//		System.out.println("Producer: "+1+" bq.size(): "+bq.toString());
//		bq.put(String.valueOf(2));
//		System.out.println("Producer: "+2+" bq.size(): "+bq.toString());

for(int i=0;i<5;i++){
Producer producer = new Producer(bq,i);
Consumer consumer = new Consumer(bq);
new Thread(producer).start();//第二个参数是线程名
new Thread(consumer).start();
}
}

}

class Producer implements Runnable {

private BlockingQueue<String> bq;
private int productid;

public Producer(int productid) {
super();
this.productid = productid;
}

public Producer(BlockingQueue<String> bq,int productid) {
super();
this.bq = bq;
this.productid = productid;
}

@Override
public void run() {
try {Thread.sleep(1000);
bq.put(String.valueOf(productid));
System.out.println("Producer: "+productid+" bq.size(): "+bq.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}

class Consumer implements Runnable{
private BlockingQueue<String> bq;

public Consumer() {
super();
}

public Consumer(BlockingQueue<String> bq) {
super();
this.bq = bq;
}

@Override
public void run() {
try {
String temp = bq.take();
System.out.println("Consumer: "+temp+" bq.size(): "+bq.size());
} catch (Exception e) {
e.printStackTrace();
}
}

}

线程池的Executors.newFixedThreadPool(3) 是线程池有3个线程, 但是如果来100个任务,且单个任务执行特别慢, 那么会因为来不及执行剩下的97个任务而产生丢任务, 

所以需要有一个blockingqueue队列来保存线程池暂时来不及执行的runnable task, 这个blockingqueue 也是有大小限制的,代码如下:

结合线程池用代码:

package ThreadPoolTest;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolAndBlockQueue {

//线程池不自带blockqueue的用法
public void notContainBQ(){
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
ExecutorService service = Executors.newCachedThreadPool();
Producer producer;
Consumer consumer;
for(int i=0;i<5;i++) {
producer = new Producer(queue,i);
consumer = new Consumer(queue);
service.execute(producer);
service.execute(consumer);
}
}

//自带blockqueue的用法
public static void containBQ() {
ExecutorService service = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2));//这个bq参数是控制execute提交过来后,真正执行的线程数的
//如果一个任务过程,迟迟执行不完, 导致过来的runnable任务太多,blockqueue放不下,那么会丢runnable任务
Producer producer;
Consumer consumer;
BlockingQueue<String> bq = new LinkedBlockingQueue<String>(2);
producer = new Producer(bq,1);
//		consumer = new Consumer(bq);
for(int i=0;i<105;i++) {
service.execute(producer);
//			service.execute(consumer);
}

}

public static void main(String[] args) {
containBQ();
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: