您的位置:首页 > 编程语言 > Java开发

【Java多线程】ExecutorService与ExecutorCompletionService

2016-06-14 16:23 876 查看
ExecutorService与ExecutorCompletionService都是java.util.concurrent包的并发处理类,总的来说,ExecutorCompletionService是ExecutorService的功能增强版,ExecutorCompletionService以BlockingQueue<Future<V>>来存放已经完成的任务。
也就是说,优先完成的任务会优先存放在BlockingQueue<Future<V>>队列中,所以我们能及时的拿到最优先的处理结果。

让我们先看看ExecutorService的测试代码,共4个任务,我们刻意让第1个任务的执行时间最长,依次递减,代码如下:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestExecutorService
{
public static void main(String[] args) throws InterruptedException, ExecutionException
{
ExecutorService es = Executors.newFixedThreadPool(4);

//第一个任务
Callable<Integer> task1 = new Callable<Integer>() {
public Integer call() throws Exception
{
//耗时最长, 4秒
Thread.sleep(4000);
return 1;
}
};

//第二个任务
Callable<Integer> task2 = new Callable<Integer>() {
public Integer call() throws Exception
{
Thread.sleep(3000);
return 2;
}
};

//第三个任务
Callable<Integer> task3 = new Callable<Integer>() {
public Integer call() throws Exception
{
Thread.sleep(2000);
return 3;
}
};

//第四个任务
Callable<Integer> task4 = new Callable<Integer>() {
public Integer call() throws Exception
{
Thread.sleep(1000);
return 4;
}
};

Future<Integer> result1 = es.submit(task1);
Future<Integer> result2 = es.submit(task2);
Future<Integer> result3 = es.submit(task3);
Future<Integer> result4 = es.submit(task4);

System.out.println("第1个任务等待中...");
System.out.println("第1个任务完成:【" + result1.get() + "】");

System.out.println("第2个任务等待中...");
System.out.println("第2个任务完成:【" + result2.get() + "】");

System.out.println("第3个任务等待中...");
System.out.println("第3个任务完成:【" + result3.get() + "】");

System.out.println("第4个任务等待中...");
System.out.println("第4个任务完成:【" + result4.get() + "】");
}
}
输出结果:
第1个任务等待中...
第1个任务完成:【1】
第2个任务等待中...
第2个任务完成:【2】
第3个任务等待中...
第3个任务完成:【3】
第4个任务等待中...
第4个任务完成:【4】
PS:Future.get()方法会造成阻塞,直到任务执行完毕为止。
运行代码可见,result1.get()阻塞了4秒后完成任务,输出结果,而紧随的result2.get(),result3.get(),result4.get()没有阻塞,立马输出结果。那是因为在result1.get()执行完毕时,其余3个任务早已执行完毕等待抓取结果了。所以,使用上述方法并不能得知哪个任务是最先返回结果的。

接下来,让我们看看ExecutorCompletionService的代码:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestExecutorComplectionService
{
public static void main(String[] args) throws InterruptedException, ExecutionException
{
ExecutorService es = Executors.newFixedThreadPool(4);
ExecutorCompletionService<Integer> ecs = new ExecutorCompletionService<Integer>(es);

//第一个任务
Callable<Integer> task1 = new Callable<Integer>() {
public Integer call() throws Exception
{
//耗时最长, 4秒
Thread.sleep(4000);
return 1;
}
};

//第二个任务
Callable<Integer> task2 = new Callable<Integer>() {
public Integer call() throws Exception
{
Thread.sleep(3000);
return 2;
}
};

//第三个任务
Callable<Integer> task3 = new Callable<Integer>() {
public Integer call() throws Exception
{
Thread.sleep(2000);
return 3;
}
};

//第四个任务
Callable<Integer> task4 = new Callable<Integer>() {
public Integer call() throws Exception
{
Thread.sleep(1000);
return 4;
}
};

ecs.submit(task1);
ecs.submit(task2);
ecs.submit(task3);
ecs.submit(task4);

for(int i = 0; i < 4; i++)
{
Future<Integer> result = ecs.take();
System.out.println("输出结果:【" + result.get() + "】");
}
}
}
输出结果:
输出结果:【4】
输出结果:【3】
输出结果:【2】
输出结果:【1】
可见,第4个任务task4执行的时间是最短的,首先输出结果。

下面让我们剖析一下ExecutorComplectionService的源码:

成员变量如下:



executor:ExecutorService类,任务并行执行器

completionQueue:就是保存执行结果的阻塞队列BlockingQueue




submit方法:底层依旧使用ExecutorService来并发执行任务,只不过是多了个功能【把执行完毕的任务放到complectionQueue队列中】




task方法:从complectionQueue队列中获取一个元素,如果没有元素,则阻塞,直到队列中有元素位置,这验证了我们之前的说法。

总结:总的来说,ExecutorComplectionService其实就是 ExecutorService 和 BlockingQueue的结合。

本文出自 “DeaconLi” 博客,请务必保留此出处http://lizhuquan0769.blog.51cto.com/2591147/1789158
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: