spring boot实战(第十一篇)初识RabbitMQ
2015-11-01 15:31
579 查看
前言
最近几篇文章将围绕消息中间件RabbitMQ展开,对于RabbitMQ基本概念这里不阐述,主要讲解RabbitMQ的基本用法、Java客户端API介绍、Spring Boot与RabbitMQ整合、Spring Boot与RabbitMQ整合源码分析。
RabbitMQ安装
在使用消息中间件RabbitMQ之前就是安装RabbitMQ。安装erlang:yum install erlang
下载RabbitMQ安装包: https://www.rabbitmq.com/releases/rabbitmq-server/v3.5.6/rabbitmq-server-generic-unix-3.5.6.tar.gz 解压安装包、配置环境变量RABBITMQ_HOME
参考网址:https://www.rabbitmq.com/install-generic-unix.html
windows: https://www.rabbitmq.com/install-windows.html
RabbitMQ配置
1.安装完成后需要对RabbitMQ进行配置,在etc/rabbitmq目录下创建两个文件:rabbitmq-env.conf 环境信息配置
RABBITMQ_NODE_IP_ADDRESS=127.0.0.1 RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=node01
rabbitmq.config 核心配置文件
[{rabbit, [{loopback_users, []}]}].该配置表示是的默认用户guest用户可以远程访问mq(广域网不能访问,内网可以访问)
2.启动RabbitMQ 执行命令 rabbitmq-server
RabbitMQ 3.5.4. Copyright (C) 2007-2015 Pivotal Software, Inc. ## ## Licensed under the MPL. See http://www.rabbitmq.com/ ## ## ########## Logs: /Users/liaokailin/software/rabbitmq_server-3.5.4/sbin/../var/log/rabbitmq/node01.log ###### ## /Users/liaokailin/software/rabbitmq_server-3.5.4/sbin/../var/log/rabbitmq/node01-sasl.log ########## Starting broker... completed with 0 plugins.
3. RabbitMQ提供WEB-UI管理控制台,使用 rabbitmq-plugins enable rabbitmq_management命令启用,重启后可以看到
Starting broker... completed with 6 plugins.
表明WEB-UI控制台启动成功,访问:http://localhost:15672/
登陆进入:
通过该控制台可以方便管理RabbitMQ。
创建Test用户
RabbitMQ默认使用guest用户,下面讲述如何创建一个test用户,最快捷的做法使用web管理控制台这里使用命令创建:
rabbitmqctl add_user test test
rabbitmqctl set_user_tags test administrator
tag分为四种"management", "policymaker", "monitoring" "administrator" 详见 http://www.rabbitmq.com/management.html
RabbitMQ 其他
在实际使用RabbitMQ中还需要涉及到 RabbitMQ的集群、高可用(采用镜像队列实现)以后有机会再详细阐述,有兴趣可参考https://www.rabbitmq.com/documentation.htmlRabbitMQ Java Client
RabbitMQ 客户端支持语言种类繁多,官方都一一举例:https://www.rabbitmq.com/getstarted.html这里主要自己开发一个小的demo
消息消费者
操作步骤:创建连接工厂ConnectionFactory
获取连接Connection
通过连接获取通信通道Channel
声明交换机Exchange:交换机类型分为四类:
FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange :通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
声明队列Queue
将队列和交换机绑定
创建消费者
执行消息的消费
package org.lkl.mq.rabbitmq.test; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownSignalException; /** * 客户端01 * * @author liaokailin * @version $Id: Receive01.java, v 0.1 2015年11月01日 下午3:47:58 liaokailin Exp $ */ public class Receive01 { public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory facotry = new ConnectionFactory(); facotry.setUsername("test"); facotry.setPassword("test"); facotry.setVirtualHost("test"); facotry.setHost("localhost"); Connection conn = facotry.newConnection(); //获取一个链接 //通过Channel进行通信 Channel channel = conn.createChannel(); int prefetchCount = 1; channel.basicQos(prefetchCount); //保证公平分发 boolean durable = true; //声明交换机 channel.exchangeDeclare(Send.EXCHANGE_NAME, "direct", durable); //按照routingKey过滤 //声明队列 String queueName = channel.queueDeclare("queue-01", true, true, false, null).getQueue(); //将队列和交换机绑定 String routingKey = "lkl-0"; //队列可以多次绑定,绑定不同的交换机或者路由key channel.queueBind(queueName, Send.EXCHANGE_NAME, routingKey); //创建消费者 QueueingConsumer consumer = new QueueingConsumer(channel); //将消费者和队列关联 channel.basicConsume(queueName, false, consumer); // 设置为false表面手动确认消息消费 //获取消息 System.out.println(" Wait message ...."); while (true) { Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); String key = delivery.getEnvelope().getRoutingKey(); System.out.println(" Received '" + key + "':'" + msg + "'"); System.out.println(" Handle message"); TimeUnit.SECONDS.sleep(3); //mock handle message channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //确定该消息已成功消费 } } }
消息生产者
操作步骤:创建连接工厂ConnectionFactory
获取连接Connection
通过连接获取通信通道Channel
发送消息
package org.lkl.mq.rabbitmq.test; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; /** * 消息publish * * @author liaokailin * @version $Id: Send.java, v 0.1 2015年10月22日 下午3:48:09 liaokailin Exp $ */ public class Send { public final static String EXCHANGE_NAME = "test-exchange"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { /** * 配置amqp broker 连接信息 */ ConnectionFactory facotry = new ConnectionFactory(); facotry.setUsername("test"); facotry.setPassword("test"); facotry.setVirtualHost("test"); facotry.setHost("localhost"); Connection conn = facotry.newConnection(); //获取一个链接 //通过Channel进行通信 Channel channel = conn.createChannel(); // channel.exchangeDeclare(Send.EXCHANGE_NAME, "direct", true); //如果消费者已创建,这里可不声明 channel.confirmSelect(); //Enables publisher acknowledgements on this channel channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("[handleNack] :" + deliveryTag + "," + multiple); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("[handleAck] :" + deliveryTag + "," + multiple); } }); String message = "lkl-"; //消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN //发送多条信息,每条消息对应routekey都不一致 for (int i = 0; i < 10; i++) { channel.basicPublish(EXCHANGE_NAME, message + (i % 2), MessageProperties.PERSISTENT_TEXT_PLAIN, (message + i).getBytes()); System.out.println("[send] msg " + (message + i) + " of routingKey is " + (message + (i % 2))); } } }
在设置消息被消费的回调前需显示调用
channel.confirmSelect()否则回调函数无法调用
先执行消费者,消费者会轮询是否有消息的到来,在web控制也可以观察哦~~,再启动生产者发送消息。
小结
本篇的主要目的为简单介绍下RabbitMQ的使用,下一遍讲解Spring Boot与RabbitMQ的整合。相关文章推荐
- spring 注解方式配置Bean
- Spring AOP
- Spring Controller介绍
- 解决SSH框架中Struts不能接受Android的不同Key值上传图片问题
- Java ”22/Sep/2015:00:18:59“转换格式
- 第一次启动Eclipse,显示没有找到javaw.exe
- Eclipse添加和查看书签
- Java学习之Iterator(迭代器)的一般用法 (转)
- spring第一天
- java基础测试题(1)
- Java中final、finally、finalize的区别
- 设置 JDK环境变量(Windows)
- 折纸问题(Java)
- Java线程池
- JAVA数组之课后作业
- Java Jdk Jre initial install
- java学习笔记.05——CyclicBarrier和CountDownLatch
- 几种常见排序算法的Java代码实现
- JAVA学习笔记之继承
- 在Eclipse中反编译Class文件