《RocketMQ 安装和使用》
2015-07-18 10:36
399 查看
安装Maven
安装步骤:《Maven的安装、配置及使用入门》http://www.cnblogs.com/dcba1112/archive/2011/05/01/2033805.html
http://maven.apache.org/download.cgi (apache-maven-3.3.3-bin.zip)
Path环境变量,当我们在cmd中输入命令时,Windows首先会在当前目录中寻找可执行文件或脚本,如果没有找到,Windows会接着遍历环境变量Path中定义的路径。由于我们将%M2_HOME%\bin添加到了Path中,而这里%M2_HOME%实际上是引用了我们前面定义的另一个变量,其值是Maven的安装目录。因此,Windows会在执行命令时搜索目录D:\bin\apache-maven-3.0\bin,而mvn执行脚本的位置就是这里。
安装RocketMQ
源码地址:(2015-07-18 3.2.6版本)E:\RocketMQ\RocketMQ-master
E:\RocketMQ\RocketMQ_Workspace
E:\RocketMQ\RocketMQ-master\target\alibaba-rocketmq-3.2.6-alibaba-rocketmq\alibaba-rocketmq\bin
执行下边的命令或者执行install.bat(在这个bat文件中的命令如下)对maven熟悉的一眼就知道是执行clean package install assembly等操作。
mvn -Dmaven.test.skip=true clean packageinstall assembly:assembly –U
将RocketMQ-master导入到eclipse中
![](http://images0.cnblogs.com/blog2015/777545/201507/181137489852269.png)
进入刚生成的target文件夹下的bin目录,在命令行中执行:start mqnamesrv.exe,会弹出一个信息窗口,显示The name Server boot success 说明启动成功了,接着启动borker,在命令行中执行:start mqbroker.exe -n 127.0.0.1:9876 同样的弹出一个窗口,看到success表示成功了。
start mqnamesrv >E:\RocketMQ\logs\alibaba-rocketmq/mqnamesrv.log
【遇到问题】无法启动mqnamesrv。显示软件不兼容。
![](http://images0.cnblogs.com/blog2015/777545/201507/181039203294850.png)
【如何在windows下使用linux的shell脚本】
http://www.cygwin.com/。
E:\cygwin64。
【当前目录调出CMD】
在桌面上先按住Shift键,然后鼠标右键,出现选项“在此处打开命令窗口(W)”也可以打开命令行。
生产者
public class Producer{ public static void main(String[] args) throws MQClientException,InterruptedException{ //一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例 final DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesevAddr("127.0.0.1:9876"); producer.setInstanceName("Producer"); producer.start(); //一个Producer对象可以发送多个topic,多个tag的消息 for(int i=0; i<10; i++){ try{ { Message msg = new Message("TopicTest1","TagA","OrderID001",("Hello MetaQ").getBytes()); SendResult sendResult = pro .send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicTest2", "TagB","OrderID0034",("Hello MetaQB").getBytes()); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicTest3",// topic "TagC",// tag "OrderID061",// key ("Hello MetaQC").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } }catch(Exception e){ e.printStackTrace(); } TimeUnit.MILLSECONDS.sleep(1000); } /** * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己 * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法 */ //producer.shutdown(); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { producer.shutdown(); } })); System.exit(0); } }
消费者
public class Consumer { /** * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br> * 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br> */ public static void main(String[] args) throws InterruptedException, MQClientException{ /** * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br> * 注意:ConsumerGroupName需要由应用来保证唯一 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "ConsumerGroupName"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setInstanceName("Consumber"); /** * 订阅指定topic下tags分别等于TagA或TagC或TagD */ consumer.subscribe("TopicTest1","TagA || TagC || TagD"); /** * 订阅指定topic下所有消息<br> * 注意:一个consumer对象可以订阅多个topic */ consumer.subscribe("TopicTest2","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt>msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() +" Receive New Messages: " + msgs.size()); MessageExt msg = msgs.get(0); if(msg.getTopic().equals("TopicTest1")) { //执行TopicTest1的消费逻辑 if(msg.getTags() != null && msg.getTags().equals("TagA")) { //执行TagA的消费 System.out.println(new String(msg.getBody())); }else if (msg.getTags() != null &&msg.getTags().equals("TagC")) { //执行TagC的消费 System.out.println(new String(msg.getBody())); }else if (msg.getTags() != null &&msg.getTags().equals("TagD")) { //执行TagD的消费 System.out.println(new String(msg.getBody())); } }else if (msg.getTopic().equals("TopicTest2")) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br> */ consumer.start(); System.out.println("ConsumerStarted."); } }
参考文章
l 《RocketMQ在windows上安装和开发使用》/article/1607476.htmll 《RocketMQ在Windows平台下环境搭建》/article/6351907.html
l 《RocketMQ随笔分类》http://www.cnblogs.com/marcotan/category/655319.html
相关文章推荐
- SSL/TLS协议运行机制的概述
- 设计一个算法,采用BFS方式输出图G中从顶点u到v的最短路径(不带权的无向连通图G采用邻接表存储)
- LeetCode 118 Pascal's Triangle
- [LeetCode][Java] Minimum Window Substring
- 【实训项目】银行储蓄系统
- Secure Boot
- 地道
- SSL/TLS协议运行机制的概述
- linux运维人员需要知道的重要/常用目录介绍
- ajax提交form表单的两种方法
- 1048. Find Coins (25)
- [UEFI启动教程][第三章]BIOS锁定纯UEFI启动的解锁办法
- 3Sum
- wordpress添加后台菜单及权限设置
- java初学笔记20150718.十进制数转换为二进
- 1048. Find Coins (25)
- 【Oracle 集群】ORACLE DATABASE 11G RAC 知识图文详细教程之RAC 特殊问题和实战经验(五)
- GO语言练习:channel 缓冲机制
- 黑马程序员——Java基础---多线程
- java-循环-break用法