java5线程框架Executor 线程并发控制
2008-12-21 00:23
363 查看
下面是一个在多线程执行并发任务下 进行控制的例子例子(测试 CompletionService)。
这里模拟这样一个场景: 一个由5个问题专家组成的团队, 同时解决同一个问题, 每个人的耗时都不同。
A策略: 每个专家得出结果后, 立刻在后台输出结果;
B策略: 只要有一个专家得出结果, 就在后台输出结果, 同时其它专家退出任务。
写了3个类来验证这种情况,只在windows下做了测试。
1.1、 CompletionSolver.java 进行并发工作的专家
1.2、 CompletionLogic.java 专家们并发工作的调度逻辑
1.3、 CompletionTest.java 带有main方法的测试类,5个专家开始同时解决一个棘手的问题
================= 1.1 CompletionSolver.java =====================
package Executor;
import java.util.concurrent.Callable;
//任务解决专家
public class CompletionSolver implements Callable<Result> {
//耗时
private int timeCost;
//结果
private Result result;
//专家名字
private String solverName;
public CompletionSolver(int timeCost, Result result, String solverName) {
this.timeCost = timeCost;
this.result = result;
this.solverName = solverName;
}
private CompletionSolver(){}
//执行任务
public Result call() throws Exception {
try {
Thread.sleep(timeCost*1000);
result.setResult(this.solverName + " worked out the result.");
} catch (Exception e) {}
return result;
}
//获得专家名字
public String getSolverName() {
return solverName;
}
}
========================= 1.1 end ==========================
================= 1.2 CompletionLogic.java =================
package Executor;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
//专家们并发工作的调度逻辑
public class CompletionLogic {
//只要同步工作的 问题解决者 solvers 有一个算出了结果,就给出算出的结果; 知道所有 问题解决者 都算出了结果
public void solveLogicA(Executor executor, List<CompletionSolver> solvers) throws InterruptedException, ExecutionException {
//创建并发线程服务
CompletionService<Result> completionService = new ExecutorCompletionService<Result>(executor);
//执行任务
for (Callable<Result> s : solvers){
completionService.submit(s);
}
//获得专家结果
int n = solvers.size();
for (int i = 0; i < n; ++i) {
Result r = completionService.take().get();
if (r != null)
use(r);
}
}
//只要同步工作的 问题解决者 solvers 有一个算出了结果, 其它 问题解决者就停止工作。最后给出算出的结果
public void solveLogicB(Executor executor, List<CompletionSolver> solvers) throws InterruptedException, ExecutionException {
//创建并发线程服务
CompletionService<Result> completionService = new ExecutorCompletionService<Result>(executor);
//对每一个并发任务的控制
int n = solvers.size();
List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
Result result = null;
try {
for (Callable<Result> s : solvers){
//专家开始工作, 同时获得对 工作 的控制
futures.add(completionService.submit(s));
}
for (int i = 0; i < n; ++i) {
try {
//获取专家的结果
Result r = completionService.take().get();
if (r != null) {
result = r;
use(result);
break;
}
} catch(ExecutionException ignore) {}
}
}finally {
//其它专家退出任务
boolean canceled = false;
for (int i = 0; i < futures.size(); i++) {
canceled = futures.get(i).cancel(true);
if(canceled)
System.out.println(currenttime() + solvers.get(i).getSolverName() + " 退出工作。");
}
}
}
//使用结果
private void use(Result result){
System.out.println(currenttime() + result.getResult());
}
//获得当前时间
private String currenttime() {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
return sdf.format(new Date()) + ": ";
}
}
========================= 1.2 end ==========================
================= 1.3 CompletionTest.java =====================
package Executor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletionTest {
public static void main(String[] args) {
try {
//组建一个问题解决专家的团队
List<CompletionSolver> solvers = new ArrayList<CompletionSolver>();
solvers.add(new CompletionSolver(20,new Result(), "问题处理专家A"));
solvers.add(new CompletionSolver(15,new Result(), "问题处理专家B"));
solvers.add(new CompletionSolver(10,new Result(), "问题处理专家C"));
solvers.add(new CompletionSolver(12,new Result(), "问题处理专家D"));
solvers.add(new CompletionSolver(18,new Result(), "问题处理专家E"));
//创建有十个线程的线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
//创建一个协调专家们工作的服务调度
CompletionLogic completionLogic = new CompletionLogic();
//策略A:专家采用A方案开始工作
completionLogic.solveLogicA(executorService, solvers);
//策略B:专家采用B方案开始工作
//completionLogic.solveLogicB(executorService, solvers);
} catch (InterruptedException e) {
// TODO: handle exception
} catch (ExecutionException e) {
// TODO: handle exception
}
}
}
========================= 1.3 end ==========================
打开策略A, 关闭策略B后的输出如下:
14:19:33.812: 问题处理专家C worked out the result.
14:19:34.516: 问题处理专家D worked out the result.
14:19:37.518: 问题处理专家B worked out the result.
14:19:40.520: 问题处理专家E worked out the result.
14:19:42.522: 问题处理专家A worked out the result.
打开策略B, 关闭策略A后的输出如下:
13:56:11.175: 问题处理专家C worked out the result.
13:56:11.206: 问题处理专家A 退出工作。
13:56:11.206: 问题处理专家B 退出工作。
13:56:11.206: 问题处理专家D 退出工作。
13:56:11.206: 问题处理专家E 退出工作。
这里模拟这样一个场景: 一个由5个问题专家组成的团队, 同时解决同一个问题, 每个人的耗时都不同。
A策略: 每个专家得出结果后, 立刻在后台输出结果;
B策略: 只要有一个专家得出结果, 就在后台输出结果, 同时其它专家退出任务。
写了3个类来验证这种情况,只在windows下做了测试。
1.1、 CompletionSolver.java 进行并发工作的专家
1.2、 CompletionLogic.java 专家们并发工作的调度逻辑
1.3、 CompletionTest.java 带有main方法的测试类,5个专家开始同时解决一个棘手的问题
================= 1.1 CompletionSolver.java =====================
package Executor;
import java.util.concurrent.Callable;
//任务解决专家
public class CompletionSolver implements Callable<Result> {
//耗时
private int timeCost;
//结果
private Result result;
//专家名字
private String solverName;
public CompletionSolver(int timeCost, Result result, String solverName) {
this.timeCost = timeCost;
this.result = result;
this.solverName = solverName;
}
private CompletionSolver(){}
//执行任务
public Result call() throws Exception {
try {
Thread.sleep(timeCost*1000);
result.setResult(this.solverName + " worked out the result.");
} catch (Exception e) {}
return result;
}
//获得专家名字
public String getSolverName() {
return solverName;
}
}
========================= 1.1 end ==========================
================= 1.2 CompletionLogic.java =================
package Executor;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
//专家们并发工作的调度逻辑
public class CompletionLogic {
//只要同步工作的 问题解决者 solvers 有一个算出了结果,就给出算出的结果; 知道所有 问题解决者 都算出了结果
public void solveLogicA(Executor executor, List<CompletionSolver> solvers) throws InterruptedException, ExecutionException {
//创建并发线程服务
CompletionService<Result> completionService = new ExecutorCompletionService<Result>(executor);
//执行任务
for (Callable<Result> s : solvers){
completionService.submit(s);
}
//获得专家结果
int n = solvers.size();
for (int i = 0; i < n; ++i) {
Result r = completionService.take().get();
if (r != null)
use(r);
}
}
//只要同步工作的 问题解决者 solvers 有一个算出了结果, 其它 问题解决者就停止工作。最后给出算出的结果
public void solveLogicB(Executor executor, List<CompletionSolver> solvers) throws InterruptedException, ExecutionException {
//创建并发线程服务
CompletionService<Result> completionService = new ExecutorCompletionService<Result>(executor);
//对每一个并发任务的控制
int n = solvers.size();
List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
Result result = null;
try {
for (Callable<Result> s : solvers){
//专家开始工作, 同时获得对 工作 的控制
futures.add(completionService.submit(s));
}
for (int i = 0; i < n; ++i) {
try {
//获取专家的结果
Result r = completionService.take().get();
if (r != null) {
result = r;
use(result);
break;
}
} catch(ExecutionException ignore) {}
}
}finally {
//其它专家退出任务
boolean canceled = false;
for (int i = 0; i < futures.size(); i++) {
canceled = futures.get(i).cancel(true);
if(canceled)
System.out.println(currenttime() + solvers.get(i).getSolverName() + " 退出工作。");
}
}
}
//使用结果
private void use(Result result){
System.out.println(currenttime() + result.getResult());
}
//获得当前时间
private String currenttime() {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
return sdf.format(new Date()) + ": ";
}
}
========================= 1.2 end ==========================
================= 1.3 CompletionTest.java =====================
package Executor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletionTest {
public static void main(String[] args) {
try {
//组建一个问题解决专家的团队
List<CompletionSolver> solvers = new ArrayList<CompletionSolver>();
solvers.add(new CompletionSolver(20,new Result(), "问题处理专家A"));
solvers.add(new CompletionSolver(15,new Result(), "问题处理专家B"));
solvers.add(new CompletionSolver(10,new Result(), "问题处理专家C"));
solvers.add(new CompletionSolver(12,new Result(), "问题处理专家D"));
solvers.add(new CompletionSolver(18,new Result(), "问题处理专家E"));
//创建有十个线程的线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
//创建一个协调专家们工作的服务调度
CompletionLogic completionLogic = new CompletionLogic();
//策略A:专家采用A方案开始工作
completionLogic.solveLogicA(executorService, solvers);
//策略B:专家采用B方案开始工作
//completionLogic.solveLogicB(executorService, solvers);
} catch (InterruptedException e) {
// TODO: handle exception
} catch (ExecutionException e) {
// TODO: handle exception
}
}
}
========================= 1.3 end ==========================
打开策略A, 关闭策略B后的输出如下:
14:19:33.812: 问题处理专家C worked out the result.
14:19:34.516: 问题处理专家D worked out the result.
14:19:37.518: 问题处理专家B worked out the result.
14:19:40.520: 问题处理专家E worked out the result.
14:19:42.522: 问题处理专家A worked out the result.
打开策略B, 关闭策略A后的输出如下:
13:56:11.175: 问题处理专家C worked out the result.
13:56:11.206: 问题处理专家A 退出工作。
13:56:11.206: 问题处理专家B 退出工作。
13:56:11.206: 问题处理专家D 退出工作。
13:56:11.206: 问题处理专家E 退出工作。
相关文章推荐
- java线程安全之Executor框架及自定义线程池(十五)
- java5线程框架Executor 排程定时功能
- java5线程框架Executor的用法举例
- Java并发编程之线程管理(Executor框架12)
- Java并发编程之线程管理(Executor框架16)
- Java线程Executor框架详解与使用
- Java 线程 Executor 框架详解与使用
- Java并发编程之线程管理(Executor框架15)
- java线程之Executor框架与线程池
- java5线程框架Executor的用法举例
- Java中的线程(九)- Executor框架
- 05 Java JUC 中的 Executor 框架 提交线程
- Java线程与并发编程实践----并发工具类与Executor框架
- java5线程框架Executor的用法举例
- java并发编程-Executor框架
- [多线程] java并发编程-Executor框架
- Java并发——Executor框架详解(Executor框架结构与框架成员)
- Java的Executor框架和线程池实现原理
- 【转】Java Executor框架在多线程应用中的使用
- java并发编程-Executor框架