您的位置:首页 > 编程语言 > Java开发

java5线程框架Executor 线程并发控制

2008-12-21 00:23 363 查看
下面是一个在多线程执行并发任务下 进行控制的例子例子(测试 CompletionService)。
这里模拟这样一个场景: 一个由5个问题专家组成的团队, 同时解决同一个问题, 每个人的耗时都不同。
A策略: 每个专家得出结果后, 立刻在后台输出结果;
B策略: 只要有一个专家得出结果, 就在后台输出结果, 同时其它专家退出任务。

写了3个类来验证这种情况,只在windows下做了测试。
1.1、 CompletionSolver.java 进行并发工作的专家
1.2、 CompletionLogic.java 专家们并发工作的调度逻辑
1.3、 CompletionTest.java 带有main方法的测试类,5个专家开始同时解决一个棘手的问题

================= 1.1 CompletionSolver.java =====================
package Executor;
import java.util.concurrent.Callable;

//任务解决专家
public class CompletionSolver implements Callable<Result> {

//耗时
private int timeCost;

//结果
private Result result;

//专家名字
private String solverName;

public CompletionSolver(int timeCost, Result result, String solverName) {
this.timeCost = timeCost;
this.result = result;
this.solverName = solverName;
}

private CompletionSolver(){}

//执行任务
public Result call() throws Exception {
try {
Thread.sleep(timeCost*1000);
result.setResult(this.solverName + " worked out the result.");
} catch (Exception e) {}
return result;
}

//获得专家名字
public String getSolverName() {
return solverName;
}
}
========================= 1.1 end ==========================

================= 1.2 CompletionLogic.java =================
package Executor;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;

//专家们并发工作的调度逻辑
public class CompletionLogic {

//只要同步工作的 问题解决者 solvers 有一个算出了结果,就给出算出的结果; 知道所有 问题解决者 都算出了结果
public void solveLogicA(Executor executor, List<CompletionSolver> solvers) throws InterruptedException, ExecutionException {
//创建并发线程服务
CompletionService<Result> completionService = new ExecutorCompletionService<Result>(executor);

//执行任务
for (Callable<Result> s : solvers){
completionService.submit(s);
}

//获得专家结果
int n = solvers.size();
for (int i = 0; i < n; ++i) {
Result r = completionService.take().get();
if (r != null)
use(r);
}
}

//只要同步工作的 问题解决者 solvers 有一个算出了结果, 其它 问题解决者就停止工作。最后给出算出的结果
public void solveLogicB(Executor executor, List<CompletionSolver> solvers) throws InterruptedException, ExecutionException {
//创建并发线程服务
CompletionService<Result> completionService = new ExecutorCompletionService<Result>(executor);

//对每一个并发任务的控制
int n = solvers.size();
List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
Result result = null;

try {
for (Callable<Result> s : solvers){
//专家开始工作, 同时获得对 工作 的控制
futures.add(completionService.submit(s));
}

for (int i = 0; i < n; ++i) {
try {
//获取专家的结果
Result r = completionService.take().get();
if (r != null) {
result = r;
use(result);
break;
}
} catch(ExecutionException ignore) {}
}
}finally {
//其它专家退出任务
boolean canceled = false;
for (int i = 0; i < futures.size(); i++) {
canceled = futures.get(i).cancel(true);
if(canceled)
System.out.println(currenttime() + solvers.get(i).getSolverName() + " 退出工作。");
}
}
}

//使用结果
private void use(Result result){
System.out.println(currenttime() + result.getResult());
}

//获得当前时间
private String currenttime() {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
return sdf.format(new Date()) + ": ";
}
}
========================= 1.2 end ==========================

================= 1.3 CompletionTest.java =====================
package Executor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletionTest {

public static void main(String[] args) {

try {
//组建一个问题解决专家的团队
List<CompletionSolver> solvers = new ArrayList<CompletionSolver>();
solvers.add(new CompletionSolver(20,new Result(), "问题处理专家A"));
solvers.add(new CompletionSolver(15,new Result(), "问题处理专家B"));
solvers.add(new CompletionSolver(10,new Result(), "问题处理专家C"));
solvers.add(new CompletionSolver(12,new Result(), "问题处理专家D"));
solvers.add(new CompletionSolver(18,new Result(), "问题处理专家E"));

//创建有十个线程的线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);

//创建一个协调专家们工作的服务调度
CompletionLogic completionLogic = new CompletionLogic();

//策略A:专家采用A方案开始工作
completionLogic.solveLogicA(executorService, solvers);

//策略B:专家采用B方案开始工作
//completionLogic.solveLogicB(executorService, solvers);

} catch (InterruptedException e) {
// TODO: handle exception
} catch (ExecutionException e) {
// TODO: handle exception
}
}
}
========================= 1.3 end ==========================

打开策略A, 关闭策略B后的输出如下:
14:19:33.812: 问题处理专家C worked out the result.
14:19:34.516: 问题处理专家D worked out the result.
14:19:37.518: 问题处理专家B worked out the result.
14:19:40.520: 问题处理专家E worked out the result.
14:19:42.522: 问题处理专家A worked out the result.

打开策略B, 关闭策略A后的输出如下:
13:56:11.175: 问题处理专家C worked out the result.
13:56:11.206: 问题处理专家A 退出工作。
13:56:11.206: 问题处理专家B 退出工作。
13:56:11.206: 问题处理专家D 退出工作。
13:56:11.206: 问题处理专家E 退出工作。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: