disruptor demo(一) 使用原生API创建一个简单的生产者和消费者
2014-07-31 15:15
731 查看
1.
2.
3.
//POJO 交易类 public class TradeTransaction { private String id; //交易ID private double price;//交易金额 public TradeTransaction(){} public TradeTransaction(String id,double price){ super(); this.id = id; this.price = price; } public String getId() { return id; } public void setId(String id) { this.id = id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } }
2.
import java.util.UUID; import com.lmax.disruptor.EventHandler; public class TradeTransactionInDBHandler implements EventHandler<TradeTransaction>{ @Override public void onEvent(TradeTransaction event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } public void onEvent(TradeTransaction event) throws Exception{ event.setId(UUID.randomUUID().toString()); System.out.println(event.getId()); } }
3.
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.lmax.disruptor.BatchEventProcessor; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.YieldingWaitStrategy; public class Demo1 { public static void main(String[] args)throws InterruptedException,ExecutionException{ final int BUFFER_SIZE = 1024; final int THREAD_NUMBERS = 2; /* * createSingleProducer创建一个单生产者的RingBuffer * 第一个参数叫EventFactory,从名字上理解就是"事件工厂",其实它的职责就是产生数据填充RingBuffer的区块 * 第二参数是RingBuffer的大小,它必须是2的指数倍,目的是为了将求棋运算转为&运算提高效率 * 第三个参数是RingBuffer的生产都在没有可用区块(slot)的时候(可能是消费者(或者说是事件处理器)太慢了)的等待策略 */ final RingBuffer<TradeTransaction> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TradeTransaction>(){ @Override public TradeTransaction newInstance() { return new TradeTransaction(); } }, BUFFER_SIZE,new YieldingWaitStrategy()); //创建线程池 ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); //创建SequenceBarrier 序号屏障器 SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); //创建消息处理器 BatchEventProcessor<TradeTransaction> transProcessor = new BatchEventProcessor<TradeTransaction>(ringBuffer,sequenceBarrier,new TradeTransactionInDBHandler()); //ringBuffer可以知晓消费者的状态 ringBuffer.addGatingSequences(transProcessor.getSequence()); executors.submit(transProcessor); Future<?> future = executors.submit(new Callable<Void>(){ @Override public Void call() throws Exception { long seq; for(int i=0;i<1000;i++){ seq = ringBuffer.next(); ringBuffer.get(seq).setPrice(Math.random()*9999); ringBuffer.publish(seq);//发布这个slot的数据使handler(consumer)可见 } return null; } }); future.get(); Thread.sleep(1000); transProcessor.halt(); executors.shutdown(); } }
相关文章推荐
- 不使用API,创建一个最简单的窗口
- [原创]java WEB学习笔记40:简单标签概述(背景,使用一个标签,标签库的API,SimpleTag接口,创建一个自定义的标签的步骤 和简单实践)
- 使用Spring来创建一个简单的工作流引擎
- 使用timer控件创建一个简单的报警程序
- 使用JFram创建一个简单的窗口
- 一个简单的C语言操作系统生产者消费者模型
- 简单构建一个xmlhttp对象池合理创建和使用xmlhttp对象
- 不使用ATL向导,创建一个简单的ATL对话框程序.
- 使用q3radiant创建一个简单的地图,然后运行
- 使用Spring来创建一个简单的工作流引擎
- 使用Spring来创建一个简单的工作流引擎
- [使用心得]maven2之m2eclipse使用手册之六使用Maven2插件创建一个简单的SSH2项目之jetty篇(二)
- 使用Spring来创建一个简单的工作流引擎
- 使用ArcGIS Online创建一个简单的mash-up
- 使用Spring来创建一个简单的工作流引擎
- 使用Spring来创建一个简单的工作流引擎
- 使用timer控件创建一个简单的报警程序
- NUnit的使用(1)——创建一个简单的单元测试
- 使用 timer 来创建一个简单的报警程序
- 使用ArcGIS Online创建一个简单的mash-up