您的位置:首页 > 编程语言

无边界-生成者与消费者模式

2016-06-02 21:25 267 查看
简述

生成者消费者模式,在并发编程模式里面很常见。无边界是指消息队列,没有设置,具体大小。

下面是常见的两种模式

通用类

MyTask : 定义了任务接口

package com.ricisung.Model.chapter1;

public abstract class MyTask {

abstract void doSomething();

}


SayHelloTask: 它是MyTask 的实现类; 主要打印一句hello

package com.ricisung.Model.chapter1;

public class SayHelloTask extends MyTask {

@Override
void doSomething() {
System.out.println(Thread.currentThread().getName() +" ...." +"say hello...");
}

}


第一种消费者模式

每个线程持有一个任务队列。用消费代理分发给不同的线程处理,详细如下:

Customer

package com.ricisung.Model.chapter1;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.log4j.Logger;

public class Customer extends Thread {

private static Logger logger = Logger.getLogger(Customer.class);

private BlockingQueue<MyTask> queue = new LinkedBlockingQueue<MyTask>();

public void putTask(MyTask task) throws InterruptedException {
queue.add(task);
}

@Override
public void run() {

while (true) {
try {
MyTask task = queue.take();
task.doSomething();

} catch (Throwable e) {
logger.error("",e);
}
}
}
}


消息代理: 消费者代理,随机选取一个消费者,进行消费

CustomerProxy :

package com.ricisung.Model.chapter1;

import java.util.Random;

import org.apache.log4j.Logger;

public class CustomerProxy {
private static Logger logger = Logger.getLogger(CustomerProxy.class);

private static CustomerProxy instance = null;

private Customer[] customers = null;

private final int MAX_SIZE = 4;

private CustomerProxy() {
customers = new Customer[MAX_SIZE];
for (int i = 0; i < MAX_SIZE; i++) {
customers[i] = new Customer();
customers[i].setDaemon(true);
customers[i].start();
}
}

public static synchronized CustomerProxy instance() throws InterruptedException {
if (null == instance) {
instance = new CustomerProxy();
}
return instance;
}

public void doWork(MyTask task) {
int index = new Random().nextInt(MAX_SIZE);
try {

customers[index].putTask(task);

} catch (Throwable e) {
logger.error("",e);
}
}

}


第二种模式:只有消费代理有一个任务队列,所有的线程都从消费代理的任务队列中获取任务。

CustomerProxy1

package com.ricisung.Model.chapter1;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.log4j.Logger;

public class CustomerProxy1 {

private static Logger logger = Logger.getLogger(CustomerProxy1.class);

private static CustomerProxy1 instance = null;

private Customer1[] customers = null;

private final int MAX_SIZE = 4;

private BlockingQueue<MyTask> queue = new LinkedBlockingQueue<MyTask>();

private CustomerProxy1() {
customers = new Customer1[MAX_SIZE];
for (int i = 0; i < MAX_SIZE; i ++) {
customers[i] = new Customer1();
customers[i].setDaemon(true);
customers[i].start();
}

}

public static synchronized CustomerProxy1 instance() {
if (null == instance) {
instance = new CustomerProxy1();
}
return instance;
}

public void doWork(MyTask task) {
try {
queue.add(task);

} catch (Throwable e) {
logger.error("",e);
}
}

class Customer1 extends Thread {

@Override
public void run() {

while(true) {
try {
MyTask task = queue.take();
task.doSomething();

} catch (Throwable e) {
logger.error("",e);
}
}
}

}

}


Producer 生成者

package com.ricisung.Model.chapter1;

public class Producer {

public static void main(String args[]) throws InterruptedException {
//oneModel();
twoModel();
}

public static void oneModel() throws InterruptedException {
for (int i = 0; i < 100; i ++) {
CustomerProxy.instance().doWork(new SayHelloTask());
}

Thread.sleep(1000 * 5);
}

public static void twoModel() throws InterruptedException {
for (int i = 0; i < 100; i ++) {
CustomerProxy1.instance().doWork(new SayHelloTask());
}

Thread.sleep(1000 * 5);

}

}


测试结果:



两种模式的比较

两种默认都可以有效的完成任务。

第一种,每个线程都具有一个自己的任务队列,这种模式相比第二种模式,CPU减少了在不同线程阻塞,减少了线程排队同步问题。

第二种,因为多个线程同时共享了一个队列,所以这种模式更充分的利用了线程; 第一种模式中一个线程完成任务了,就会立即阻塞,不会帮助其他线程共同完成任务;

如果多种任务同时存在,并且不同任务之间需要按相互先后顺序,使用第一种模式就能很好的解决此类问题:将有依赖关系的任务放到同一个线程里进行处理。

7.源码下载地址

http://download.csdn.net/detail/jia281460530/9539500
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  编程