多线程设计模式——Master-Salave(主仆)模式
2016-07-13 13:34
573 查看
这些都是根据我最近看的《Java实战指南多线程编程(设计模式篇)》所得整理。
Master类负责原始任务的分解、子系统的派发和子任务处理结果的合并:service这是Master参与者对外暴露的接口,用于接收原始任务并返回其结果,splotWork将原始任务费解成若干个语义等同的子任务,callSlaves将各个子任务分解成若干语义等同的子任务,combineResults将各个子任务的处理结果进行整合,形成原始任务的处理结果。
Slave类负责子任务的处理:subService异步方法,负责执行子任务的处理逻辑。
Created with Raphaël 2.1.0ClientClientMasterMasterslaveAslaveAslaveBslaveBslaveXslaveX1Service()2splitWork()3callSlaves()4subService()56subService()78subService()910combineResults()11
流量统计工具的Java模块代码
1.并行计算
2.容错处理
3.计算精度
该模式可以带来一下几点好处
1.可交换性和可扩展性
2.提升计算性能
模式需要考虑以下几个问题
1.子任务处理结果的收集
2.Slave参与者实例的负载均衡与工作窃取
3.可靠性与异常处理
4.Slave线程的停止
模式名称
Master-Salave(主仆)模式模式解决的问题
有时候线程中的方法任务比较多,需要利用线程的优势,分解为若干个任务去处理,。解决思路
将一个任务分解为若干个语义等同的子任务,并由专门的工作者线程来并行执行这些子任务,既提高计算效率,又实现了信息隐藏。Master类负责原始任务的分解、子系统的派发和子任务处理结果的合并:service这是Master参与者对外暴露的接口,用于接收原始任务并返回其结果,splotWork将原始任务费解成若干个语义等同的子任务,callSlaves将各个子任务分解成若干语义等同的子任务,combineResults将各个子任务的处理结果进行整合,形成原始任务的处理结果。
Slave类负责子任务的处理:subService异步方法,负责执行子任务的处理逻辑。
Created with Raphaël 2.1.0ClientClientMasterMasterslaveAslaveAslaveBslaveBslaveXslaveX1Service()2splitWork()3callSlaves()4subService()56subService()78subService()910combineResults()11
例子代码
某基于WebService的电信系统需要一个系统流量统计工具,该工具的统计数据依据是该系统运行过程中穿绳的接口日志文件。接口日志文件记录了该系统接收到的请求、该系统返回给客户端的相应以及该系统调用外部系统时设计的请求和相应的相关数据。流量统计工具的Java模块代码
public class TPSStart { public static void main(String[] args) throws Exception{ //接口日志文件所在目录 String logBaseDir =args[0]; //忽略的操作名列表 String excludedOperationNames = ""; //指定要统计在内的操作名列表 String includedOperationNames = "*"; //指定要统计在内的目标设备名 String destinationSysName = "*"; int argc = args.length; if(argc > 2){ excludedOperationNames = args[1]; } if(argc > 3){ excludedOperationNames = args[2]; } if(argc > 4){ excludedOperationNames = args[3]; } Master processor = new Master(logBaseDir,excludedOperationNames, includedOperationNames,destinationSysName); BufferedReader fileNamesReader = new BufferConcurrentMap<K, V>InputStreamReader( System.in)); ConcurrentMap<String,AtomicInteger> result =processor .calculate(fileNamesReader); for(String timeRange:result.keySet()){ System.out.println(timeRange + "," result.get(timeRange)); } } //模式角色:Master-Slave.Master private ddf0 static class Master{ private final String logFileBaseDir; private final String excludedOperationNames; private final String includedOperationNames; private final String destinationSysNames //每次派发给某个Slave线程的文件个数 private static final int NUMBER_OF_FILES_FOR_EACH_DISPATCH = 5; private static final int WORKER_COUNT =Runtime.getRuntime() .availableProcessors(); public Master(String logFileBaseDir,String excludedOperationNames, String includedOperationNames,String destinationSysName){ this.logFileBaseDir = logFileBaseDir; this.excludedOperationNames = excludedOperationNames; this.includedOperationNames = includedOperationNames; this.destinationSysNames = destinationSysName; } public ConcurrentMap<String,AtomicInteger> calculate( BufferedReader fileNamesReader) throws IOException{ ConcurrentMap<String,AtomicInteger> repository = new ConcurrentSkipListMap<String,AtomicInteger>(); //创建工作者线程 Worker[] workers =createAndStartWorkers(repository); //指派任务给工作者线程 dispatchTask(fileNamesReader,workers); //等待工作者线程处理结束 for(int i = 0; i < WORKER_COUNT; i++){ workers[i].terminate(true); } //返回处理结果 return repositoty; } private void dispatchTask(BufferedReader fileNamesReader,Worker[] workers) throws IOException{ String line; Set<String> fileNames =new HashSet<String>(); int fileCount = 0; int workerIndex = -1; BufferReader logFileReader; while((line = fileNamesReader.readLine()) != null){ fileNames.add(line); fileCount++; if(0 == (fileCount % NUMBER_OF_FILES_FOR_EACH_DISPATCH)){ //工作者线程间的负载均衡:这里采用简单的轮询选择worker workerIndex =(workderIndex + 1) % WORKER_COUNT; logFileReader =makeReaderFoem(fileNames); Debug.info("Dispatch " + NUBMER_OF_FILES_FOR_EACH_DISPATCH + " files to worker:" + workerIndex); workers[workerIndex].submitWorkload(logFileReader); fileNames =new HashSet<String>(); fileCount = 0; } } if(fileCount > 0){ logFileReader = makeReaderFrom(fileNames); workerIndex = (workerIndex + 1) % WORKER_COUNT; workers[workerIndex].submitWorkload(logFileReader); } } private BufferReader makeReaderFrom(final Set<String> logFileNames){ BufferedReader logFileReader; InputStream in = new SequenceInputStream(new Enumeration<InputStream>(){ private Iterator<String> iterator = logFileNames.iterator(); @Override public boolean hasMoreElements(){ return iterator.hasNext(); } @Override public InputStream nextElement(){ String fileName = iterator.next(); InputStream in = null; try{ in = new FileInputStream(logFileBaseDir + fileName); }catch(FileNotFoundException e){ throw new RuntimeException(e); } return in; } }); logFileReader = new BufferedReader(new InputStreamReader(in)); return logFileReader; } } //模式角色:Master-Slave.Slave private static class Worker extends AbstractTerminatableThread{ private static final Pattern SPLIT_PATTERN = Pattern.compile("\\|"); private final CoucurrentMap<String,AtomicInteger> repository; private BlockingQueue<BufferedReader> workQueue; private final String selfDevice = "ESB"; private final String excludedOperationNames; private final String includedOperationNames; private final String destinationSysName; puclic Worker(ConcurrentMap<String,AtomicInteger> repository, String excludedOperationNames,String includedOperationNames, String destinationSysName){ this.repository = repository; workerQueue = new ArrayBlockingQueue<BufferedReader>(100); this.excludedOperationNames = excludedOperationNames; this.includedOperationNames = includedOperationNames; this.destinationSysName =destinationSysName; } public void submitWorkload(BufferedReader taskWorkload){ try{ workQueue.put(taskWorkload); terminationToken.reservations.incrementAndGet(); }catch(InterruptedException e){ ; } } privated void doRun() throws Exception{ BufferedReader logFileReader = workQueue.take(); String interfaceLogRecord; String[]recordParts; String timeStamp; AtomicInteger reqCounter; AtomicInteger exixtingReqCounter; int i = 0; try{ while((interfaceLogRecord = logFileReader.readLine()) != null){ recordParts =SPLIT_PATTERM.split(interfaceLogRecord,0); //避免CPU占用过高 if(0 == ((++i) % 100000)){ Thread.sleep(80); i = 0; } //跳过无效记录(如果有的话) if(recordParts.length < 7){ continue; } if(("request".equals(recordParts)[2]) && (recordParts[6].startsWith(selfDevice))){ timeStamp = recordParts[0]; timeStamp = new String(timeStamp.substring(0,19).toCharArray()); String operName = recordParts[4]; reqCounter = repository.get(timesStamp); if(null == reqCounter){ reqCounter = new AtomicInteger(0); existingReqCounter =repository .putIfAbsent(timeStamp,reqCounter); if(null != existingReqCounter){ reqCounter = existingReqCounter; } } if(isSrcDeviceEEligible(recordParts[5])){ if(excludedOperationNames.contains(operName) +','){ countinue; } if("*".equals(includedPerationNames)){ reqCounter.incrementAndGet(); }else{ if(includedOperationNames.contains(operName + ',')){ reCounter.incrementAndGet(); } } } } } }finally{ terminationToken.reservations.decrementAndGet(); logFileReader.close(); } } //判断目标设备名是否在待统计之列 private boolean ifSrcDeviceEEligible(String sourceNE){ boolean result = fales; if("*".equals(destinationSysName)){ result = true; }else if(destinationSysName.equals(sourceNE)){ result = true; } return result; } } }
模式的考量
Master-Slave(主仆)模式的应用场景包括三个1.并行计算
2.容错处理
3.计算精度
该模式可以带来一下几点好处
1.可交换性和可扩展性
2.提升计算性能
模式需要考虑以下几个问题
1.子任务处理结果的收集
2.Slave参与者实例的负载均衡与工作窃取
3.可靠性与异常处理
4.Slave线程的停止
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- Python3写爬虫(四)多线程实现数据爬取
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- c++11 + SDL2 + ffmpeg +OpenAL + java = Android播放器
- 插入排序
- 冒泡排序
- 堆排序