您的位置:首页 > 编程语言 > Java开发

多线程设计模式——Master-Salave(主仆)模式

2016-07-13 13:34 573 查看
这些都是根据我最近看的《Java实战指南多线程编程(设计模式篇)》所得整理。

模式名称

Master-Salave(主仆)模式

模式解决的问题

有时候线程中的方法任务比较多,需要利用线程的优势,分解为若干个任务去处理,。

解决思路

将一个任务分解为若干个语义等同的子任务,并由专门的工作者线程来并行执行这些子任务,既提高计算效率,又实现了信息隐藏。

Master类负责原始任务的分解、子系统的派发和子任务处理结果的合并:service这是Master参与者对外暴露的接口,用于接收原始任务并返回其结果,splotWork将原始任务费解成若干个语义等同的子任务,callSlaves将各个子任务分解成若干语义等同的子任务,combineResults将各个子任务的处理结果进行整合,形成原始任务的处理结果。

Slave类负责子任务的处理:subService异步方法,负责执行子任务的处理逻辑。

Created with Raphaël 2.1.0ClientClientMasterMasterslaveAslaveAslaveBslaveBslaveXslaveX1Service()2splitWork()3callSlaves()4subService()56subService()78subService()910combineResults()11

例子代码

某基于WebService的电信系统需要一个系统流量统计工具,该工具的统计数据依据是该系统运行过程中穿绳的接口日志文件。接口日志文件记录了该系统接收到的请求、该系统返回给客户端的相应以及该系统调用外部系统时设计的请求和相应的相关数据。

流量统计工具的Java模块代码

public class TPSStart {
public static void main(String[] args) throws Exception{
//接口日志文件所在目录
String logBaseDir =args[0];

//忽略的操作名列表
String excludedOperationNames = "";

//指定要统计在内的操作名列表
String includedOperationNames = "*";

//指定要统计在内的目标设备名
String destinationSysName = "*";

int argc = args.length;

if(argc > 2){
excludedOperationNames = args[1];
}
if(argc > 3){
excludedOperationNames = args[2];
}
if(argc > 4){
excludedOperationNames = args[3];
}
Master processor = new Master(logBaseDir,excludedOperationNames,
includedOperationNames,destinationSysName);
BufferedReader fileNamesReader = new BufferConcurrentMap<K, V>InputStreamReader(
System.in));
ConcurrentMap<String,AtomicInteger> result =processor
.calculate(fileNamesReader);
for(String timeRange:result.keySet()){
System.out.println(timeRange + "," result.get(timeRange));
}
}

//模式角色:Master-Slave.Master
private
ddf0
static class Master{
private final String logFileBaseDir;
private final String excludedOperationNames;
private final String includedOperationNames;
private final String destinationSysNames

//每次派发给某个Slave线程的文件个数
private static final int NUMBER_OF_FILES_FOR_EACH_DISPATCH = 5;
private static final int WORKER_COUNT =Runtime.getRuntime()
.availableProcessors();

public Master(String logFileBaseDir,String excludedOperationNames,
String includedOperationNames,String destinationSysName){
this.logFileBaseDir = logFileBaseDir;
this.excludedOperationNames = excludedOperationNames;
this.includedOperationNames = includedOperationNames;
this.destinationSysNames = destinationSysName;
}

public ConcurrentMap<String,AtomicInteger> calculate(
BufferedReader fileNamesReader) throws IOException{
ConcurrentMap<String,AtomicInteger> repository = new
ConcurrentSkipListMap<String,AtomicInteger>();
//创建工作者线程
Worker[] workers =createAndStartWorkers(repository);
//指派任务给工作者线程
dispatchTask(fileNamesReader,workers);
//等待工作者线程处理结束
for(int i = 0; i < WORKER_COUNT; i++){
workers[i].terminate(true);
}
//返回处理结果
return repositoty;
}

private void dispatchTask(BufferedReader fileNamesReader,Worker[] workers)
throws IOException{
String line;
Set<String> fileNames =new HashSet<String>();

int fileCount = 0;
int workerIndex = -1;
BufferReader logFileReader;
while((line = fileNamesReader.readLine()) != null){
fileNames.add(line);
fileCount++;
if(0 == (fileCount % NUMBER_OF_FILES_FOR_EACH_DISPATCH)){
//工作者线程间的负载均衡:这里采用简单的轮询选择worker
workerIndex =(workderIndex + 1) % WORKER_COUNT;
logFileReader =makeReaderFoem(fileNames);

Debug.info("Dispatch " + NUBMER_OF_FILES_FOR_EACH_DISPATCH
+ " files to worker:" + workerIndex);
workers[workerIndex].submitWorkload(logFileReader);

fileNames =new HashSet<String>();
fileCount = 0;
}
}
if(fileCount > 0){
logFileReader = makeReaderFrom(fileNames);
workerIndex = (workerIndex + 1) % WORKER_COUNT;
workers[workerIndex].submitWorkload(logFileReader);
}
}
private BufferReader makeReaderFrom(final Set<String> logFileNames){
BufferedReader logFileReader;
InputStream in = new SequenceInputStream(new Enumeration<InputStream>(){
private Iterator<String> iterator = logFileNames.iterator();

@Override
public boolean hasMoreElements(){
return iterator.hasNext();
}

@Override
public InputStream nextElement(){
String fileName = iterator.next();
InputStream in = null;
try{
in = new FileInputStream(logFileBaseDir + fileName);
}catch(FileNotFoundException e){
throw new RuntimeException(e);
}
return in;
}
});
logFileReader = new BufferedReader(new InputStreamReader(in));
return logFileReader;
}
}

//模式角色:Master-Slave.Slave
private static class Worker extends AbstractTerminatableThread{
private static final Pattern SPLIT_PATTERN = Pattern.compile("\\|");
private final CoucurrentMap<String,AtomicInteger> repository;
private BlockingQueue<BufferedReader> workQueue;

private final String selfDevice = "ESB";
private final String excludedOperationNames;
private final String includedOperationNames;
private final String destinationSysName;

puclic Worker(ConcurrentMap<String,AtomicInteger> repository,
String excludedOperationNames,String includedOperationNames,
String destinationSysName){
this.repository = repository;
workerQueue = new ArrayBlockingQueue<BufferedReader>(100);
this.excludedOperationNames = excludedOperationNames;
this.includedOperationNames = includedOperationNames;
this.destinationSysName =destinationSysName;
}

public void submitWorkload(BufferedReader taskWorkload){
try{
workQueue.put(taskWorkload);
terminationToken.reservations.incrementAndGet();
}catch(InterruptedException e){
;
}
}

privated void doRun() throws Exception{
BufferedReader logFileReader = workQueue.take();

String interfaceLogRecord;
String[]recordParts;
String timeStamp;
AtomicInteger reqCounter;
AtomicInteger exixtingReqCounter;
int i = 0;
try{
while((interfaceLogRecord = logFileReader.readLine()) != null){
recordParts =SPLIT_PATTERM.split(interfaceLogRecord,0);

//避免CPU占用过高
if(0 == ((++i) % 100000)){
Thread.sleep(80);
i = 0;
}

//跳过无效记录(如果有的话)
if(recordParts.length < 7){
continue;
}
if(("request".equals(recordParts)[2]) && (recordParts[6].startsWith(selfDevice))){
timeStamp = recordParts[0];
timeStamp = new String(timeStamp.substring(0,19).toCharArray());
String operName = recordParts[4];
reqCounter = repository.get(timesStamp);
if(null == reqCounter){
reqCounter = new AtomicInteger(0);
existingReqCounter =repository
.putIfAbsent(timeStamp,reqCounter);
if(null != existingReqCounter){
reqCounter = existingReqCounter;
}
}
if(isSrcDeviceEEligible(recordParts[5])){
if(excludedOperationNames.contains(operName) +','){
countinue;
}
if("*".equals(includedPerationNames)){
reqCounter.incrementAndGet();
}else{
if(includedOperationNames.contains(operName + ',')){
reCounter.incrementAndGet();
}
}
}
}
}
}finally{
terminationToken.reservations.decrementAndGet();
logFileReader.close();
}
}

//判断目标设备名是否在待统计之列
private boolean ifSrcDeviceEEligible(String sourceNE){
boolean result = fales;
if("*".equals(destinationSysName)){
result = true;
}else if(destinationSysName.equals(sourceNE)){
result = true;
}
return result;
}
}
}


模式的考量

Master-Slave(主仆)模式的应用场景包括三个

1.并行计算

2.容错处理

3.计算精度

该模式可以带来一下几点好处

1.可交换性和可扩展性

2.提升计算性能

模式需要考虑以下几个问题

1.子任务处理结果的收集

2.Slave参与者实例的负载均衡与工作窃取

3.可靠性与异常处理

4.Slave线程的停止
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息