Disruptor多个消费者不重复处理生产者发送的消息的demo
2017-06-01 10:05
555 查看
上一篇介绍的一个生产者向ringbuffer发送了10条消息,每个消费者都会把这个10个消息消费一遍,但是我们还有一种需求就是要让多个消费者不重复消费消息,下面这个简单demo实现了此功能:
创建一个消息的生产者类:
package Disruptor2;
import com.lmax.disruptor.RingBuffer;
import Disruptor.TradeTransaction;
public class Producer {
private RingBuffer<TradeTransaction> ringBuffer;
public Producer(RingBuffer<TradeTransaction> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData() {
// 可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
long sequence = ringBuffer.next();
try {
} finally {
System.out.println("生产者发送了一条消息");
ringBuffer.publish(sequence);
}
}
}
创建一个消息的消费者类:
package Disruptor2;
import com.lmax.disruptor.WorkHandler;
import Disruptor.TradeTransaction;
public class Consumer implements WorkHandler<TradeTransaction>{
@Override
public void onEvent(TradeTransaction event) throws Exception {
// TODO Auto-generated method stub
System.out.println("消费者C1消费了一条消息");
}
}
创建一个IntEventExceptionHandler类:
package Disruptor2;
import org.apache.log4j.Logger;
import com.lmax.disruptor.ExceptionHandler;
public class IntEventExceptionHandler implements ExceptionHandler {
private static final Logger logger = Logger.getLogger(IntEventExceptionHandler.class);
public void handleEventException(Throwable ex, long sequence, Object event) {
logger.error("handleEventException", ex);
}
public void handleOnStartException(Throwable ex) {
logger.error("handleOnStartException", ex);
}
public void handleOnShutdownException(Throwable ex) {
logger.error("handleOnShutdownException", ex);
}
}
编写一个测试类:
package Disruptor2;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import Disruptor.TradeTransaction;
public class Disruptor2 {
public static void main(String[] args) {
Long time = System.currentTimeMillis();
RingBuffer<TradeTransaction> ringBuffer;
Producer producer = null;
// 创建缓冲池
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
// 创建工厂
EventFactory<TradeTransaction> factory = new EventFactory<TradeTransaction>() {
@Override
public TradeTransaction newInstance() {
return new TradeTransaction();
}
};
// 创建bufferSize ,也就是RingBuffer大小,必须是2的N次方
int ringBufferSize = 1024 * 1024; //
WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
// 创建ringBuffer
ringBuffer = RingBuffer.create(ProducerType.MULTI, factory, ringBufferSize, YIELDING_WAIT);
SequenceBarrier barriers = ringBuffer.newBarrier();
// 创建10个消费者来处理同一个生产者发的消息(这10个消费者不重复消费消息)
Consumer[] consumers = new Consumer[10];
for (int i = 0; i < consumers.length; i++) {
consumers[i] = new Consumer();
}
WorkerPool<TradeTransaction> workerPool = new WorkerPool<TradeTransaction>(ringBuffer, barriers,
new IntEventExceptionHandler(), consumers);
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
workerPool.start(executor);
producer = new Producer(ringBuffer);
for (int i = 0; i < 10; i++) {
producer.onData();
}
//executor.shutdown();
System.out.println("花费时间 :" + (System.currentTimeMillis() - time));
}
}
测试结果如下:
生产者发送了一条消息
生产者发送了一条消息
生产者发送了一条消息
生产者发送了一条消息
生产者发送了一条消息
生产者发送了一条消息
生产者发送了一条消息
生产者发送了一条消息
生产者发送了一条消息
生产者发送了一条消息
消费者C1消费了一条消息
消费者C1消费了一条消息
消费者C1消费了一条消息
消费者C1消费了一条消息
消费者C1消费了一条消息
消费者C1消费了一条消息
消费者C1消费了一条消息
消费者C1消费了一条消息
消费者C1消费了一条消息
花费时间 :146
消费者C1消费了一条消息
由结果可以看出来,一共生成了10个消费者用于消费生产者产生的消息,每一条消息只消费了一次
创建一个消息的生产者类:
package Disruptor2;
import com.lmax.disruptor.RingBuffer;
import Disruptor.TradeTransaction;
public class Producer {
private RingBuffer<TradeTransaction> ringBuffer;
public Producer(RingBuffer<TradeTransaction> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData() {
// 可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
long sequence = ringBuffer.next();
try {
} finally {
System.out.println("生产者发送了一条消息");
ringBuffer.publish(sequence);
}
}
}
创建一个消息的消费者类:
package Disruptor2;
import com.lmax.disruptor.WorkHandler;
import Disruptor.TradeTransaction;
public class Consumer implements WorkHandler<TradeTransaction>{
@Override
public void onEvent(TradeTransaction event) throws Exception {
// TODO Auto-generated method stub
System.out.println("消费者C1消费了一条消息");
}
}
创建一个IntEventExceptionHandler类:
package Disruptor2;
import org.apache.log4j.Logger;
import com.lmax.disruptor.ExceptionHandler;
public class IntEventExceptionHandler implements ExceptionHandler {
private static final Logger logger = Logger.getLogger(IntEventExceptionHandler.class);
public void handleEventException(Throwable ex, long sequence, Object event) {
logger.error("handleEventException", ex);
}
public void handleOnStartException(Throwable ex) {
logger.error("handleOnStartException", ex);
}
public void handleOnShutdownException(Throwable ex) {
logger.error("handleOnShutdownException", ex);
}
}
编写一个测试类:
package Disruptor2;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import Disruptor.TradeTransaction;
public class Disruptor2 {
public static void main(String[] args) {
Long time = System.currentTimeMillis();
RingBuffer<TradeTransaction> ringBuffer;
Producer producer = null;
// 创建缓冲池
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
// 创建工厂
EventFactory<TradeTransaction> factory = new EventFactory<TradeTransaction>() {
@Override
public TradeTransaction newInstance() {
return new TradeTransaction();
}
};
// 创建bufferSize ,也就是RingBuffer大小,必须是2的N次方
int ringBufferSize = 1024 * 1024; //
WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
// 创建ringBuffer
ringBuffer = RingBuffer.create(ProducerType.MULTI, factory, ringBufferSize, YIELDING_WAIT);
SequenceBarrier barriers = ringBuffer.newBarrier();
// 创建10个消费者来处理同一个生产者发的消息(这10个消费者不重复消费消息)
Consumer[] consumers = new Consumer[10];
for (int i = 0; i < consumers.length; i++) {
consumers[i] = new Consumer();
}
WorkerPool<TradeTransaction> workerPool = new WorkerPool<TradeTransaction>(ringBuffer, barriers,
new IntEventExceptionHandler(), consumers);
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
workerPool.start(executor);
producer = new Producer(ringBuffer);
for (int i = 0; i < 10; i++) {
producer.onData();
}
//executor.shutdown();
System.out.println("花费时间 :" + (System.currentTimeMillis() - time));
}
}
测试结果如下:
生产者发送了一条消息
生产者发送了一条消息
生产者发送了一条消息
生产者发送了一条消息
生产者发送了一条消息
生产者发送了一条消息
生产者发送了一条消息
生产者发送了一条消息
生产者发送了一条消息
生产者发送了一条消息
消费者C1消费了一条消息
消费者C1消费了一条消息
消费者C1消费了一条消息
消费者C1消费了一条消息
消费者C1消费了一条消息
消费者C1消费了一条消息
消费者C1消费了一条消息
消费者C1消费了一条消息
消费者C1消费了一条消息
花费时间 :146
消费者C1消费了一条消息
由结果可以看出来,一共生成了10个消费者用于消费生产者产生的消息,每一条消息只消费了一次
相关文章推荐
- Disruptor多个消费者不重复处理生产者发送过来的消息
- Disruptor多个消费者独立处理生产者消息的简单demo
- spring JMS、activemq中消费者收不到生产者发送的消息的原因解析
- RabbitMQ消息队列之二:消费者和生产者 Demo
- 基于Kafka的生产者消费者消息处理本地调试
- spring+activemq配置多个生产者,多个消费者并发处理消息
- 互斥锁和条件变量(2)——生产者和消费者(发送消息,循环队列执行)
- 基于Kafka的生产者消费者消息处理本地调试
- LMAX Disruptor—多生产者多消费者中,消息复制分发的高性能实现
- kafka(java客户端)消费者取不到消息,生产者消息也没发送成功
- RabbitMQ消息通信,生产者发送消息给指定的消费者的消息队列
- LMAX Disruptor—多生产者多消费者中,消息复制分发的高性能实现
- RabbitMQ 消息发送和消息获取 之 rabbitMQ消息生产者和消费者
- Poco::Thread 生产者消费者Demo
- openfire+smack 简单的 发送消息 demo及各种错误解决方案。
- MiniGUI消息发送与处理
- android应用程序消息处理机制之消息发送
- openfire+smack 简单的 发送消息 demo及各种错误解决方案。
- handle.sendEmptyMessageDelayed(message ,TIME_OUT) 发送延迟处理的消息
- 消息处理(二):投递与发送