您的位置:首页 > 其它

Master-Worker

2016-06-16 15:55 323 查看
这种模型是最常用的并行模式之一,在Nginx源码中有涉及到有想看的可以去这个大神的博客了解一下http://blog.csdn.net/marcky/article/details/6014733,这位大神写的有些简洁。从思想的角度来说,它主要由两类进程进行协作:分别是Master进程和Worker进程。Master进程负责接受和分配任务,Worker进程负责处理子任务,当Worker将子任务处理完成后,将结果返回给Master进程,由Master进程做归纳和汇总,得到最终结果,具体流程可以看此图



这种模式能够将一个大任务分解成若干个小任务去执行,适合一些耗时比较久的任务,能够提高系统的吞吐量。

一个相对完整的模型应该具备以下功能



在借鉴了Java性能优化书上的列子,上面实现了一个简单的Master-Worker模式

[java] view
plain copy

 print?

package com.thread;  

  

import java.util.HashMap;  

import java.util.Iterator;  

import java.util.Map;  

import java.util.Queue;  

import java.util.Set;  

import java.util.concurrent.ConcurrentHashMap;  

import java.util.concurrent.ConcurrentLinkedDeque;  

  

public class Master_Worker {  

    public static void main(String args[])  

     {  

          long start = System.currentTimeMillis();  

         Master_Worker master_Worker = new Master_Worker(new PlusWorker(), 11);  

         for (int i = 0; i < 100; i++) {  

            master_Worker.submit(i);  

        }  

         master_Worker.execute();  

         int re = 0;  

         Map<String, Object> result_Map = master_Worker.getResultMap();  

         while (result_Map.size()>0||!master_Worker.isComplete()) {  

             Set<String> keysSet = result_Map.keySet();  

             String keyString = null;  

             for (String string : keysSet) {  

                keyString = string;  

                break;  

            }  

             Integer i = null;  

             if (keyString !=null) {  

                i = (Integer) result_Map.get(keyString);  

            }  

             if (i!=null) {  

                re+=i;  

            }  

             if (keyString!=null) {  

                result_Map.remove(keyString);  

            }  

        }  

         long end = System.currentTimeMillis();  

         System.out.println("结果:"+re+"-执行之间"+(end-start));  

         int sum = 0;  

         start = System.currentTimeMillis();  

         for (int i = 1; i <= 100; i++) {  

            sum+=i*i*i;  

            try {  

                Thread.sleep(100);  

            } catch (InterruptedException e) {  

                // TODO Auto-generated catch block  

                e.printStackTrace();  

            }  

        }  

         end = System.currentTimeMillis();  

         System.out.println("结果:"+sum+"-执行之间"+(end-start));  

     }  

    // 任务队列  

    protected Queue<Object> workerQueue = new ConcurrentLinkedDeque<>();  

    // Worker进程队列  

    protected Map<String, Thread> threadMap = new HashMap<>();  

    // 子任务处理结果集  

    protected Map<String, Object> resultMap = new ConcurrentHashMap<>();  

  

    // 是否所有的子任务都结束了  

    public boolean isComplete() {  

        for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {  

            if (entry.getValue().getState() != Thread.State.TERMINATED) {  

                return false;  

            }  

        }  

        return true;  

    }  

  

    // Master的构造,需要一个Worker进程逻辑,和需要的Worker进程数量  

    public Master_Worker(Worker woker, int countWorker) {  

        woker.setWorkQueue(workerQueue);  

        woker.setResultMap(resultMap);  

        for (int i = 0; i < countWorker; i++) {  

            threadMap.put(Integer.toString(i),  

                    new Thread(woker, Integer.toString(i)));  

        }  

  

    }  

  

    //返回子任务结果集  

    public Map<String, Object> getResultMap()  

    {  

        return resultMap;  

    }  

    //提交任务  

    public void submit(Object job) {  

        workerQueue.add(job);  

    }  

    public void execute()  

    {  

        for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {  

            if (entry.getValue().getState() != Thread.State.TERMINATED) {  

                entry.getValue().start();  

            }  

        }  

    }  

}  

  

class Worker implements Runnable {  

  

    // 任务队列,用于取得子任务  

    protected Queue<Object> workQueue;  

    // 子任务处理结果集  

    protected Map<String, Object> resultMap;  

  

    public void setWorkQueue(Queue<Object> workQueue) {  

        this.workQueue = workQueue;  

    }  

  

    public void setResultMap(Map<String, Object> resultMap) {  

        this.resultMap = resultMap;  

    }  

  

    // 子任务处理逻辑,在子类中实现具体逻辑  

    public Object handle(Object input) {  

        /* 这里可以写自己想要做的事情 */  

        return input;  

    }  

  

    @Override  

    public void run() {  

        // TODO Auto-generated method stub  

        while (true) {  

            // 获取子任务  

            Object inputObject = workQueue.poll();  

            if (inputObject == null) {  

                break;  

            }  

            // 处理子任务  

            Object reObject = handle(inputObject);  

            resultMap.put(Integer.toString(inputObject.hashCode()), reObject);  

        }  

    }  

}  

  

/* 

 * 扩展自己的类 

 * */  

 class PlusWorker extends Worker{  

     @Override  

    public Object handle(Object input) {  

        // TODO Auto-generated method stub  

         //在这里可以自己实现自己的业务逻辑等,在这里我让线程睡眠了100毫秒,模拟任务执行  

           

         try {  

            Thread.sleep(100);  

        } catch (InterruptedException e) {  

            // TODO Auto-generated catch block  

            e.printStackTrace();  

        }  

        Integer i = (Integer) input;  

        return i*i*i;  

    }  

 }  

   

   

这里的大多数都是借鉴java性能优化一书,加上自己的改编和简介。

-----------------------------------------------------------------------------------------------------------------------------------------------------

 


java 多线程 master worker模式

2013-11-01 10:23 533人阅读 评论(0) 收藏 举报


 分类:
 

java(55) 


版权声明:本文为博主原创文章,未经博主允许不得转载。

[java] view
plain copy

package com.example.design.master_worker;  

  

import java.lang.Thread.State;  

import java.util.ArrayList;  

import java.util.HashMap;  

import java.util.LinkedHashMap;  

import java.util.List;  

import java.util.Map;  

import java.util.Queue;  

import java.util.concurrent.LinkedBlockingQueue;  

  

public class Master {  

    /** 

     * 要做的事 

     */  

    Queue<Work> works = new LinkedBlockingQueue<Work>();  

  

    /** 

     * 分配几个人来做相当于工人 

     */  

    List<Thread> Workers = new ArrayList<Thread>();  

    /** 

     * 结果 key对应的事 value事的结果 

     */  

    ArrayList<Object> results = new ArrayList<Object>();  

  

    public Master(int runCount, Worker worker) {  

        worker.setValue(works, results);  

        for (int i = 0; i <= runCount; i++) {  

            Workers.add(new Thread(worker));  

        }  

    }  

  

    public void submit(Work w) {  

        works.add(w);  

    }  

  

    public void execute() {  

        for (Thread run : Workers) {  

            run.start();  

        }  

    }  

  

    /** 

     * 判断线程是否全部终止 

     *  

     * @return true 有未完成 

     */  

    public boolean checkIsComplete() {  

        for (Thread run : Workers) {  

            if ((run.getState() != State.TERMINATED)) {  

                return true;  

            }  

        }  

        return false;  

    }  

}  

[java] view
plain copy

  

[java] view
plain copy

package com.example.design.master_worker;  

  

public abstract class Work {  

    Object obj;  

  

    public Work(Object obj) {  

        super();  

        this.obj = obj;  

    }  

  

    public abstract Object handler();  

  

    public Object getParm() {  

        return obj;  

    };  

}  

[java] view
plain copy

package com.example.design.master_worker;  

  

  

import java.util.ArrayList;  

import java.util.Map;  

import java.util.Queue;  

  

  

public class Worker implements Runnable {  

    Queue<Work> works;  

    ArrayList<Object> results;  

  

  

    public void setValue(Queue<Work> works, ArrayList<Object> results) {  

        this.works = works;  

        this.results = results;  

    }  

  

  

    @Override  

    public void run() {  

        while (true) {  

            Work work = works.poll();  

            if (work == null) {  

                break;  

            }  

            Object result = work.handler();  

            results.add(result);  

        }  

    }  

}  

[java] view
plain copy

package com.example.design.master_worker;  

  

public class Work1 extends Work {  

  

    public Work1(Object obj) {  

        super(obj);  

    }  

  

    @Override  

    public Object handler() {  

        int i = (Integer) obj;  

        int t = i * i * i;  

        return t;  

    }  

}  

[java] view
plain copy

private static void masterTest(int t) {  

        Worker worker = new Worker();  

        Master master = new Master(5, worker);  

        for (int i = 0; i <= t; i++) {  

            master.submit(new Work1(i));  

        }  

        master.execute();  

        int result = 0;  

  

        ArrayList<Object> results = master.results;  

        while (results.size() > 0 || master.checkIsComplete()) {  

            if (results.size() > 0) {  

                result += (Integer) results.remove(0);  

            }  

        }  

        System.out.println(result);  

    }  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: