您的位置:首页 > 其它

Disruptor多个消费者不重复处理生产者发送过来的消息

2017-09-02 11:10 483 查看
1、定义事件
事件(Event)就是通过 Disruptor 进行交换的数据类型。

package com.ljq.disruptor;

import java.io.Serializable;

/**
* 定义事件数据,本质是个普通JavaBean
*
* @author jqlin
*/
@SuppressWarnings("serial")
public class LongEvent implements Serializable {
private long value;

public LongEvent() {
super();
}

public LongEvent(long value) {
super();
this.value = value;
}

public long getValue() {
return value;
}

public void setValue(long value) {
this.value = value;
}

@Override
public String toString() {
return "LongEvent [value=" + value + "]";
}

}


2、LongEvent事件生产者

package com.ljq.disruptor;

import com.lmax.disruptor.RingBuffer;

/**
* LongEvent事件生产者,生产LongEvent事件
*
* @author jqlin
*/
public class LongEventProducer {
private final RingBuffer<LongEvent> ringBuffer;

public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}

public void produceData(long value) {
long sequence = ringBuffer.next(); // 获得下一个Event槽的下标
try {
// 给Event填充数据
LongEvent event = ringBuffer.get(sequence);
event.setValue(value);

} finally {
// 发布Event,激活观察者去消费, 将sequence传递给该消费者
// 注意,最后的 ringBuffer.publish() 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
ringBuffer.publish(sequence);
}
}
}


3、LongEvent事件消息者

package com.ljq.disruptor;

import com.lmax.disruptor.WorkHandler;

/**
* LongEvent事件消息者,消息LongEvent事件
*
* @author Administrator
*
*/
public class LongEventConsumer  implements WorkHandler<LongEvent> {

@Override
public void onEvent(LongEvent event) throws Exception {
System.out.println("consumer:" + Thread.currentThread().getName() + " Event: value=" + event.getValue() );
}

}


4、ProducerConsumerMain
消费者-生产者启动类,其依靠构造Disruptor对象,调用start()方法完成启动线程。

package com.ljq.disruptor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;

/**
*  Disruptor多个消费者不重复处理生产者发送过来的消息
*
* @author Administrator
*
*/
public class ProducerConsumerMain {
public static void main(String[] args) throws InterruptedException {
Long time = System.currentTimeMillis();

// 指定 ring buffer字节大小,必需为2的N次方(能将求模运算转为位运算提高效率 ),否则影响性能
int bufferSize = 1024 * 1024;;
//固定线程数
int nThreads = 10;

ExecutorService executor = Executors.newFixedThreadPool(nThreads);

EventFactory<LongEvent> factory = new EventFactory<LongEvent>() {
@Override
public LongEvent newInstance() {
return new LongEvent();
}
};

// 创建ringBuffer
RingBuffer<LongEvent> ringBuffer = RingBuffer.create(ProducerType.MULTI, factory, bufferSize,  new YieldingWaitStrategy());
SequenceBarrier barriers = ringBuffer.newBarrier();
// 创建10个消费者来处理同一个生产者发送过来的消息(这10个消费者不重复消费消息)
LongEventConsumer[] consumers = new LongEventConsumer[50];
for (int i = 0; i < consumers.length; i++) {
consumers[i] = new LongEventConsumer();
}
WorkerPool<LongEvent> workerPool = new WorkerPool<LongEvent>(ringBuffer, barriers,
new EventExceptionHandler(), consumers);
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
workerPool.start(executor);

LongEventProducer producer = new LongEventProducer(ringBuffer);
for (int i = 0; i < 20000; i++) {
producer.produceData(i);
}

Thread.sleep(1000); //等上1秒,等消费都处理完成
workerPool.halt(); //通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!)
executor.shutdown();
System.out.println("总共耗时(单位毫秒) :" + (System.currentTimeMillis() - time));
}
}


5、EventExceptionHandler

package com.ljq.disruptor;

import com.lmax.disruptor.ExceptionHandler;

public class EventExceptionHandler implements ExceptionHandler {

@Override
public void handleEventException(Throwable ex, long sequence, Object event) {
System.out.println("handleEventException:" + ex);
}

@Override
public void handleOnShutdownException(Throwable ex) {
System.out.println("handleEventException:" + ex);
}

@Override
public void handleOnStartException(Throwable ex) {
System.out.println("handleOnStartException:" + ex);
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐