您的位置:首页 > 其它

Master_Worker模式简单模拟

2017-07-06 21:17 411 查看
Master_Worker模式是多线程环境下常用的模式,这里对这个模式做一个简单的模拟。这里只是简单的对Task中的price进行了一个简单的求和

Master.java

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Master {
//1、应该有一个承装任务的集合
private ConcurrentLinkedQueue<Task>  workQueue = new ConcurrentLinkedQueue<Task>();

//2.使用普通的HashMap去承装所有的worker对象
private HashMap<String ,Thread> workers = new HashMap<String ,Thread>();

//3.使用一个容器承装每一个worker并行执行任务的结果集
private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String,Object>();

//4.构造方法
public Master(Worker worker ,int workerCount){

//每一个worker对象都需要有master中相关数据的引用
//workerQueue用于任务的领取
worker.setWorkerQueue(this.workQueue);
//requestMap用于任务的提交
worker.setResultMap(this.resultMap);

for(int i=0;i<workerCount;i++){
//key表示每一个worker的名字,value表示线程执行对象
workers.put("worker"+Integer.toString(i), new Thread(worker));
}
}

//5.任务提交方法
public void submit(Task task){
this.workQueue.add(task);
}

//6.需要有一个执行的方法(启动应用程序,让所有的worker工作)
public void execute(){
for(Map.Entry<String, Thread> entry:workers.entrySet()){
entry.getValue().start();
}
}

/**
* 判断线程是否执行完毕
* @return
*/
public boolean isComplete() {
for(Map.Entry<String, Thread> entry:workers.entrySet()){
if(entry.getValue().getState()!=Thread.State.TERMINATED){
return false;
}
}
return true;
}

/**
* 返回最终结果数据
* @return
*/
public long getResult() {
long result = 0L;
for(Map.Entry<String, Object> entry:resultMap.entrySet()){
//逻辑的汇总
result += (Integer)entry.getValue();
}
return result;
}
}


Worker.java

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Worker implements Runnable{

private ConcurrentHashMap<String, Object> resultMap;
private ConcurrentLinkedQueue<Task> workQueue;

public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
this.resultMap=resultMap;
}

public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
this.workQueue = workQueue;
}

@Override
public void run() {
while(true){
//领取任务
Task input = this.workQueue.poll();
if(input == null){//没有任务时
break;
}

//处理任务
Object output = handler(input);

//任务提交
this.resultMap.put(Integer.toString(input.getId()), output);
}
}

/**
* 处理任务的方法
* @param input
* @return
*/
private Object handler(Task input) {
Object output = null;
try {
//模拟处理任务的耗时
Thread.sleep(500);
output=input.getPrice();
} catch (Exception e) {
e.printStackTrace();
}
return output;
}

}


Task.java

public class Task {
private int id;
private String name;
private int price;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}

}


测试类

import java.util.Random;

public class Main {
public static void main(String[] args) {
Master master = new Master(new Worker(),10);

Random random = new Random();

for (int i = 1; i <= 100; i++) {
Task task = new Task();
task.setId(i);
task.setName("任务"+i);
task.setPrice(random.nextInt(1000));
master.submit(task);
}

master.execute();

long startTime = System.currentTimeMillis();

while(true){
if(master.isComplete()){
long endTime = System.currentTimeMillis()-startTime;
long result=master.getResult();

System.out.println(result);

System.out.println("执行耗时:"+endTime);
break;
}
}
}
}


运行结果



内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: