Master-Worker
2016-06-16 15:55
323 查看
这种模型是最常用的并行模式之一,在Nginx源码中有涉及到有想看的可以去这个大神的博客了解一下http://blog.csdn.net/marcky/article/details/6014733,这位大神写的有些简洁。从思想的角度来说,它主要由两类进程进行协作:分别是Master进程和Worker进程。Master进程负责接受和分配任务,Worker进程负责处理子任务,当Worker将子任务处理完成后,将结果返回给Master进程,由Master进程做归纳和汇总,得到最终结果,具体流程可以看此图
这种模式能够将一个大任务分解成若干个小任务去执行,适合一些耗时比较久的任务,能够提高系统的吞吐量。
一个相对完整的模型应该具备以下功能
在借鉴了Java性能优化书上的列子,上面实现了一个简单的Master-Worker模式
[java] view
plain copy
print?
package com.thread;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
public class Master_Worker {
public static void main(String args[])
{
long start = System.currentTimeMillis();
Master_Worker master_Worker = new Master_Worker(new PlusWorker(), 11);
for (int i = 0; i < 100; i++) {
master_Worker.submit(i);
}
master_Worker.execute();
int re = 0;
Map<String, Object> result_Map = master_Worker.getResultMap();
while (result_Map.size()>0||!master_Worker.isComplete()) {
Set<String> keysSet = result_Map.keySet();
String keyString = null;
for (String string : keysSet) {
keyString = string;
break;
}
Integer i = null;
if (keyString !=null) {
i = (Integer) result_Map.get(keyString);
}
if (i!=null) {
re+=i;
}
if (keyString!=null) {
result_Map.remove(keyString);
}
}
long end = System.currentTimeMillis();
System.out.println("结果:"+re+"-执行之间"+(end-start));
int sum = 0;
start = System.currentTimeMillis();
for (int i = 1; i <= 100; i++) {
sum+=i*i*i;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
end = System.currentTimeMillis();
System.out.println("结果:"+sum+"-执行之间"+(end-start));
}
// 任务队列
protected Queue<Object> workerQueue = new ConcurrentLinkedDeque<>();
// Worker进程队列
protected Map<String, Thread> threadMap = new HashMap<>();
// 子任务处理结果集
protected Map<String, Object> resultMap = new ConcurrentHashMap<>();
// 是否所有的子任务都结束了
public boolean isComplete() {
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
if (entry.getValue().getState() != Thread.State.TERMINATED) {
return false;
}
}
return true;
}
// Master的构造,需要一个Worker进程逻辑,和需要的Worker进程数量
public Master_Worker(Worker woker, int countWorker) {
woker.setWorkQueue(workerQueue);
woker.setResultMap(resultMap);
for (int i = 0; i < countWorker; i++) {
threadMap.put(Integer.toString(i),
new Thread(woker, Integer.toString(i)));
}
}
//返回子任务结果集
public Map<String, Object> getResultMap()
{
return resultMap;
}
//提交任务
public void submit(Object job) {
workerQueue.add(job);
}
public void execute()
{
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
if (entry.getValue().getState() != Thread.State.TERMINATED) {
entry.getValue().start();
}
}
}
}
class Worker implements Runnable {
// 任务队列,用于取得子任务
protected Queue<Object> workQueue;
// 子任务处理结果集
protected Map<String, Object> resultMap;
public void setWorkQueue(Queue<Object> workQueue) {
this.workQueue = workQueue;
}
public void setResultMap(Map<String, Object> resultMap) {
this.resultMap = resultMap;
}
// 子任务处理逻辑,在子类中实现具体逻辑
public Object handle(Object input) {
/* 这里可以写自己想要做的事情 */
return input;
}
@Override
public void run() {
// TODO Auto-generated method stub
while (true) {
// 获取子任务
Object inputObject = workQueue.poll();
if (inputObject == null) {
break;
}
// 处理子任务
Object reObject = handle(inputObject);
resultMap.put(Integer.toString(inputObject.hashCode()), reObject);
}
}
}
/*
* 扩展自己的类
* */
class PlusWorker extends Worker{
@Override
public Object handle(Object input) {
// TODO Auto-generated method stub
//在这里可以自己实现自己的业务逻辑等,在这里我让线程睡眠了100毫秒,模拟任务执行
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Integer i = (Integer) input;
return i*i*i;
}
}
这里的大多数都是借鉴java性能优化一书,加上自己的改编和简介。
-----------------------------------------------------------------------------------------------------------------------------------------------------
2013-11-01 10:23 533人阅读 评论(0) 收藏 举报
分类:
java(55)
版权声明:本文为博主原创文章,未经博主允许不得转载。
[java] view
plain copy
package com.example.design.master_worker;
import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
public class Master {
/**
* 要做的事
*/
Queue<Work> works = new LinkedBlockingQueue<Work>();
/**
* 分配几个人来做相当于工人
*/
List<Thread> Workers = new ArrayList<Thread>();
/**
* 结果 key对应的事 value事的结果
*/
ArrayList<Object> results = new ArrayList<Object>();
public Master(int runCount, Worker worker) {
worker.setValue(works, results);
for (int i = 0; i <= runCount; i++) {
Workers.add(new Thread(worker));
}
}
public void submit(Work w) {
works.add(w);
}
public void execute() {
for (Thread run : Workers) {
run.start();
}
}
/**
* 判断线程是否全部终止
*
* @return true 有未完成
*/
public boolean checkIsComplete() {
for (Thread run : Workers) {
if ((run.getState() != State.TERMINATED)) {
return true;
}
}
return false;
}
}
[java] view
plain copy
[java] view
plain copy
package com.example.design.master_worker;
public abstract class Work {
Object obj;
public Work(Object obj) {
super();
this.obj = obj;
}
public abstract Object handler();
public Object getParm() {
return obj;
};
}
[java] view
plain copy
package com.example.design.master_worker;
import java.util.ArrayList;
import java.util.Map;
import java.util.Queue;
public class Worker implements Runnable {
Queue<Work> works;
ArrayList<Object> results;
public void setValue(Queue<Work> works, ArrayList<Object> results) {
this.works = works;
this.results = results;
}
@Override
public void run() {
while (true) {
Work work = works.poll();
if (work == null) {
break;
}
Object result = work.handler();
results.add(result);
}
}
}
[java] view
plain copy
package com.example.design.master_worker;
public class Work1 extends Work {
public Work1(Object obj) {
super(obj);
}
@Override
public Object handler() {
int i = (Integer) obj;
int t = i * i * i;
return t;
}
}
[java] view
plain copy
private static void masterTest(int t) {
Worker worker = new Worker();
Master master = new Master(5, worker);
for (int i = 0; i <= t; i++) {
master.submit(new Work1(i));
}
master.execute();
int result = 0;
ArrayList<Object> results = master.results;
while (results.size() > 0 || master.checkIsComplete()) {
if (results.size() > 0) {
result += (Integer) results.remove(0);
}
}
System.out.println(result);
}
这种模式能够将一个大任务分解成若干个小任务去执行,适合一些耗时比较久的任务,能够提高系统的吞吐量。
一个相对完整的模型应该具备以下功能
在借鉴了Java性能优化书上的列子,上面实现了一个简单的Master-Worker模式
[java] view
plain copy
print?
package com.thread;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
public class Master_Worker {
public static void main(String args[])
{
long start = System.currentTimeMillis();
Master_Worker master_Worker = new Master_Worker(new PlusWorker(), 11);
for (int i = 0; i < 100; i++) {
master_Worker.submit(i);
}
master_Worker.execute();
int re = 0;
Map<String, Object> result_Map = master_Worker.getResultMap();
while (result_Map.size()>0||!master_Worker.isComplete()) {
Set<String> keysSet = result_Map.keySet();
String keyString = null;
for (String string : keysSet) {
keyString = string;
break;
}
Integer i = null;
if (keyString !=null) {
i = (Integer) result_Map.get(keyString);
}
if (i!=null) {
re+=i;
}
if (keyString!=null) {
result_Map.remove(keyString);
}
}
long end = System.currentTimeMillis();
System.out.println("结果:"+re+"-执行之间"+(end-start));
int sum = 0;
start = System.currentTimeMillis();
for (int i = 1; i <= 100; i++) {
sum+=i*i*i;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
end = System.currentTimeMillis();
System.out.println("结果:"+sum+"-执行之间"+(end-start));
}
// 任务队列
protected Queue<Object> workerQueue = new ConcurrentLinkedDeque<>();
// Worker进程队列
protected Map<String, Thread> threadMap = new HashMap<>();
// 子任务处理结果集
protected Map<String, Object> resultMap = new ConcurrentHashMap<>();
// 是否所有的子任务都结束了
public boolean isComplete() {
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
if (entry.getValue().getState() != Thread.State.TERMINATED) {
return false;
}
}
return true;
}
// Master的构造,需要一个Worker进程逻辑,和需要的Worker进程数量
public Master_Worker(Worker woker, int countWorker) {
woker.setWorkQueue(workerQueue);
woker.setResultMap(resultMap);
for (int i = 0; i < countWorker; i++) {
threadMap.put(Integer.toString(i),
new Thread(woker, Integer.toString(i)));
}
}
//返回子任务结果集
public Map<String, Object> getResultMap()
{
return resultMap;
}
//提交任务
public void submit(Object job) {
workerQueue.add(job);
}
public void execute()
{
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
if (entry.getValue().getState() != Thread.State.TERMINATED) {
entry.getValue().start();
}
}
}
}
class Worker implements Runnable {
// 任务队列,用于取得子任务
protected Queue<Object> workQueue;
// 子任务处理结果集
protected Map<String, Object> resultMap;
public void setWorkQueue(Queue<Object> workQueue) {
this.workQueue = workQueue;
}
public void setResultMap(Map<String, Object> resultMap) {
this.resultMap = resultMap;
}
// 子任务处理逻辑,在子类中实现具体逻辑
public Object handle(Object input) {
/* 这里可以写自己想要做的事情 */
return input;
}
@Override
public void run() {
// TODO Auto-generated method stub
while (true) {
// 获取子任务
Object inputObject = workQueue.poll();
if (inputObject == null) {
break;
}
// 处理子任务
Object reObject = handle(inputObject);
resultMap.put(Integer.toString(inputObject.hashCode()), reObject);
}
}
}
/*
* 扩展自己的类
* */
class PlusWorker extends Worker{
@Override
public Object handle(Object input) {
// TODO Auto-generated method stub
//在这里可以自己实现自己的业务逻辑等,在这里我让线程睡眠了100毫秒,模拟任务执行
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Integer i = (Integer) input;
return i*i*i;
}
}
这里的大多数都是借鉴java性能优化一书,加上自己的改编和简介。
-----------------------------------------------------------------------------------------------------------------------------------------------------
java 多线程 master worker模式
2013-11-01 10:23 533人阅读 评论(0) 收藏 举报分类:
java(55)
版权声明:本文为博主原创文章,未经博主允许不得转载。
[java] view
plain copy
package com.example.design.master_worker;
import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
public class Master {
/**
* 要做的事
*/
Queue<Work> works = new LinkedBlockingQueue<Work>();
/**
* 分配几个人来做相当于工人
*/
List<Thread> Workers = new ArrayList<Thread>();
/**
* 结果 key对应的事 value事的结果
*/
ArrayList<Object> results = new ArrayList<Object>();
public Master(int runCount, Worker worker) {
worker.setValue(works, results);
for (int i = 0; i <= runCount; i++) {
Workers.add(new Thread(worker));
}
}
public void submit(Work w) {
works.add(w);
}
public void execute() {
for (Thread run : Workers) {
run.start();
}
}
/**
* 判断线程是否全部终止
*
* @return true 有未完成
*/
public boolean checkIsComplete() {
for (Thread run : Workers) {
if ((run.getState() != State.TERMINATED)) {
return true;
}
}
return false;
}
}
[java] view
plain copy
[java] view
plain copy
package com.example.design.master_worker;
public abstract class Work {
Object obj;
public Work(Object obj) {
super();
this.obj = obj;
}
public abstract Object handler();
public Object getParm() {
return obj;
};
}
[java] view
plain copy
package com.example.design.master_worker;
import java.util.ArrayList;
import java.util.Map;
import java.util.Queue;
public class Worker implements Runnable {
Queue<Work> works;
ArrayList<Object> results;
public void setValue(Queue<Work> works, ArrayList<Object> results) {
this.works = works;
this.results = results;
}
@Override
public void run() {
while (true) {
Work work = works.poll();
if (work == null) {
break;
}
Object result = work.handler();
results.add(result);
}
}
}
[java] view
plain copy
package com.example.design.master_worker;
public class Work1 extends Work {
public Work1(Object obj) {
super(obj);
}
@Override
public Object handler() {
int i = (Integer) obj;
int t = i * i * i;
return t;
}
}
[java] view
plain copy
private static void masterTest(int t) {
Worker worker = new Worker();
Master master = new Master(5, worker);
for (int i = 0; i <= t; i++) {
master.submit(new Work1(i));
}
master.execute();
int result = 0;
ArrayList<Object> results = master.results;
while (results.size() > 0 || master.checkIsComplete()) {
if (results.size() > 0) {
result += (Integer) results.remove(0);
}
}
System.out.println(result);
}
相关文章推荐
- 转: android编译过程(流程图)
- 《JS实现复制内容到剪贴板功能,可兼容所有PC浏览器,不兼容手机端》
- 图像抠图算法学习 - Shared Sampling for Real-Time Alpha Matting
- android studio 导入 annotations 注解框架
- boost::condition_variable 设计生产者消费者队列
- 工资发放签名表 项目混乱
- 【规范】流程图的标准画法
- win7/8/8.1/2008等系统打开Chm文件补丁
- jquery通过class多级选择
- SQL 2008 更改表结构后提示启用了\"阻止保存要求重新创建表的更改\"选项
- T3提示登陆失败
- 高级语言的编译:链接及装载过程介绍
- U8存货核算常用存储过程
- 借助ComboBox实现DataGridView列下拉选择效果
- 从java内存分配角度分析android内存泄漏问题
- 将DataTable中执行Select(\"条件\")后的结果显示在DataGridView
- Caffe学习笔记(4) -- 可视化训练结果
- yum tips
- T1商品无法增加下级分类
- VSFlex8n.ocx控件过期