无边界-生成者与消费者模式
2016-06-02 21:25
267 查看
简述
生成者消费者模式,在并发编程模式里面很常见。无边界是指消息队列,没有设置,具体大小。
下面是常见的两种模式
通用类
MyTask : 定义了任务接口
SayHelloTask: 它是MyTask 的实现类; 主要打印一句hello
第一种消费者模式
每个线程持有一个任务队列。用消费代理分发给不同的线程处理,详细如下:
Customer
消息代理: 消费者代理,随机选取一个消费者,进行消费
CustomerProxy :
第二种模式:只有消费代理有一个任务队列,所有的线程都从消费代理的任务队列中获取任务。
CustomerProxy1
Producer 生成者
测试结果:
两种模式的比较
两种默认都可以有效的完成任务。
第一种,每个线程都具有一个自己的任务队列,这种模式相比第二种模式,CPU减少了在不同线程阻塞,减少了线程排队同步问题。
第二种,因为多个线程同时共享了一个队列,所以这种模式更充分的利用了线程; 第一种模式中一个线程完成任务了,就会立即阻塞,不会帮助其他线程共同完成任务;
如果多种任务同时存在,并且不同任务之间需要按相互先后顺序,使用第一种模式就能很好的解决此类问题:将有依赖关系的任务放到同一个线程里进行处理。
7.源码下载地址
http://download.csdn.net/detail/jia281460530/9539500
生成者消费者模式,在并发编程模式里面很常见。无边界是指消息队列,没有设置,具体大小。
下面是常见的两种模式
通用类
MyTask : 定义了任务接口
package com.ricisung.Model.chapter1; public abstract class MyTask { abstract void doSomething(); }
SayHelloTask: 它是MyTask 的实现类; 主要打印一句hello
package com.ricisung.Model.chapter1; public class SayHelloTask extends MyTask { @Override void doSomething() { System.out.println(Thread.currentThread().getName() +" ...." +"say hello..."); } }
第一种消费者模式
每个线程持有一个任务队列。用消费代理分发给不同的线程处理,详细如下:
Customer
package com.ricisung.Model.chapter1; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.apache.log4j.Logger; public class Customer extends Thread { private static Logger logger = Logger.getLogger(Customer.class); private BlockingQueue<MyTask> queue = new LinkedBlockingQueue<MyTask>(); public void putTask(MyTask task) throws InterruptedException { queue.add(task); } @Override public void run() { while (true) { try { MyTask task = queue.take(); task.doSomething(); } catch (Throwable e) { logger.error("",e); } } } }
消息代理: 消费者代理,随机选取一个消费者,进行消费
CustomerProxy :
package com.ricisung.Model.chapter1; import java.util.Random; import org.apache.log4j.Logger; public class CustomerProxy { private static Logger logger = Logger.getLogger(CustomerProxy.class); private static CustomerProxy instance = null; private Customer[] customers = null; private final int MAX_SIZE = 4; private CustomerProxy() { customers = new Customer[MAX_SIZE]; for (int i = 0; i < MAX_SIZE; i++) { customers[i] = new Customer(); customers[i].setDaemon(true); customers[i].start(); } } public static synchronized CustomerProxy instance() throws InterruptedException { if (null == instance) { instance = new CustomerProxy(); } return instance; } public void doWork(MyTask task) { int index = new Random().nextInt(MAX_SIZE); try { customers[index].putTask(task); } catch (Throwable e) { logger.error("",e); } } }
第二种模式:只有消费代理有一个任务队列,所有的线程都从消费代理的任务队列中获取任务。
CustomerProxy1
package com.ricisung.Model.chapter1; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.apache.log4j.Logger; public class CustomerProxy1 { private static Logger logger = Logger.getLogger(CustomerProxy1.class); private static CustomerProxy1 instance = null; private Customer1[] customers = null; private final int MAX_SIZE = 4; private BlockingQueue<MyTask> queue = new LinkedBlockingQueue<MyTask>(); private CustomerProxy1() { customers = new Customer1[MAX_SIZE]; for (int i = 0; i < MAX_SIZE; i ++) { customers[i] = new Customer1(); customers[i].setDaemon(true); customers[i].start(); } } public static synchronized CustomerProxy1 instance() { if (null == instance) { instance = new CustomerProxy1(); } return instance; } public void doWork(MyTask task) { try { queue.add(task); } catch (Throwable e) { logger.error("",e); } } class Customer1 extends Thread { @Override public void run() { while(true) { try { MyTask task = queue.take(); task.doSomething(); } catch (Throwable e) { logger.error("",e); } } } } }
Producer 生成者
package com.ricisung.Model.chapter1; public class Producer { public static void main(String args[]) throws InterruptedException { //oneModel(); twoModel(); } public static void oneModel() throws InterruptedException { for (int i = 0; i < 100; i ++) { CustomerProxy.instance().doWork(new SayHelloTask()); } Thread.sleep(1000 * 5); } public static void twoModel() throws InterruptedException { for (int i = 0; i < 100; i ++) { CustomerProxy1.instance().doWork(new SayHelloTask()); } Thread.sleep(1000 * 5); } }
测试结果:
两种模式的比较
两种默认都可以有效的完成任务。
第一种,每个线程都具有一个自己的任务队列,这种模式相比第二种模式,CPU减少了在不同线程阻塞,减少了线程排队同步问题。
第二种,因为多个线程同时共享了一个队列,所以这种模式更充分的利用了线程; 第一种模式中一个线程完成任务了,就会立即阻塞,不会帮助其他线程共同完成任务;
如果多种任务同时存在,并且不同任务之间需要按相互先后顺序,使用第一种模式就能很好的解决此类问题:将有依赖关系的任务放到同一个线程里进行处理。
7.源码下载地址
http://download.csdn.net/detail/jia281460530/9539500