(实战)springboot整合ribbitMQ
话不多说。一切以实际用处为出发点。可能没有那么详细-0-。
基础配置文件:
[code] rabbitmq: addresses: ip:端口 username: 账号 password: 密码 # 支持发布确认 publisher-confirms: true # 支持发布返回 publisher-returns: true listener: simple: # 监听的最小线程数 concurrency: 4 # 监听的最大纯种数 max-concurrency: 8 retry: enabled: true # ack应答改模式:auto-自动,manual-手动,none-无应答 acknowledge-mode: auto
首先是配置pom文件。
[code]<!--引入amqp引用包-Rabbitmq的集合包--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency> <!--引入的版本号默认为最新的版本--> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
下面是配置类。
[code]@Configuration public class RabbitConfig { //交换机定义,这里我使用的是direct类型。大家可以根据自己的业务需求来指定对应的。下面会讲几种交换机的类型 //对应的3个参数1.交换机名称 2.持久性保持标识 3.是否自动删除标识 @Bean public DirectExchange directExchange() { return new DirectExchange(“name”, false, false); } //创建一个队列 @Bean(name = "queue") public Queue queue() { return QueueBuilder.durable(“name”).build(); } //绑定队列到交换机上--with对应的是direct指定的具体key。 @Bean public Binding binding(@Qualifier("queue") Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("key"); }
下面是实际发送消息的地方了:
[code]@Slf4j @Component public class RabbitProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; /** * 初始化确认发送回调及发送返回回调 */ @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } /** * 实现消息发送到RabbitMQ交换器后接收ack回调 * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack){ // 发送成功 log.info("trainLink message send success ---" + DateUtil.format(new Date())); } else { // 发送失败 log.error("trainLink message send failed because ---" + cause); } } /** * 实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调 * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error(message.getMessageProperties().getCorrelationIdString() + " send failed:error code " + replyCode + "mains:" + replyText); } /** * 发送消息,供外部调用 * ****** 重要 ******说明:发送时的方法选择 * ****** 重要 ******convertAndSend属于不要求返回确认的 * ****** 重要 ******convertSendAndReceive要求返回确认 * ****** 重要 ******大家根据不同的业务场景进行选择, * 不返回确认可以理解为全异步; * 返回确认可以理解为异步处理,同步返回,存在一个生产者等待消费者的问题 * 选择的原则一般为一致性要求较强的,要确认返回; * 一致性不强的,使用不返回确认,加大处理效率,免去等待时间 */ public void sendSMSMessage(String msg){ // fanout类型的交换器不需要routingkey,我这里用的是direct所以指定了对应的routingkey this.rabbitTemplate.convertAndSend(发送的交换机名称, 对应的routingkey, msg); } }
接收消息的消费者:
[code]@Slf4j @Component public class RabbitMQConsumer { /** * 消费者处理接收消息方法 * <p> * ****重要说明***** * 如果生产者是以convertSendAndReceive方法发送,则一定要手动给予返回,处理完后加入下面这一行: * ack-true处理:channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); * 参数说明-------消息id,fasle代表不批量处理(批量是指将消息id小于当前id的都处理掉) * ack-false处理:channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); * 参数说明-------消息id,fasle代表不批量处理(批量是指将消息id小于当前id的都处理掉),第二个false表示不重新入队(重新入队用true) * 拒绝消息:channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); 消息不会重新入队 * 参数说明-------消息id,fasle表示不重新入队(重新入队用true) * 如果不手动返回,则该消息会一直认为没有被消费掉,会一直占用rabbitmq内存空间,时间一久,必然造成内存溢出,切记!!! * * @param msg * @param message * @param channel * @throws Exception */ @RabbitListener(queues = 发送过来对应的队列名称) public void handler(String msg, Message message, Channel channel) throws Exception { try { System.out.println(msg); } catch (Exception e) { log.error(e.toString(), e); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } } }
科谱:
几种类型的交换机模式:
1、fanout模式
可以理解他是一个广播模式
不需要routing key它的消息发送时通过Exchange binding进行路由的~~在这个模式下routing key失去作用
这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定
如果接收到消息的Exchange没有与任何Queue绑定,则消息会被抛弃
2、Direct 模式
任何发送到Direct Exchange的消息都会被转发到routing_key中指定的Queue
一般情况可以使用rabbitMQ自带的Exchange:”” (该Exchange的名字为空字符串), 也可以自定义Exchange
可以将不同的routing_key与不同的queue进行绑定,不同的queue与不同exchange进行绑定
消息传递时需要一个“routing_key”
如果消息中不存在routing_key中绑定的队列名,则该消息会被抛弃
3、topic类型
前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。
topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里 4000 的匹配规则有些不同,
它约定:
routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
binding key与routing key一样也是句点号“. ”分隔的字符串
binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
几种类型的效率对比:fanout>direct>>topic 根据自身业务极对效率的考量进行选择
除上述3种外,还有headers类型,目前我还没用到过,不大了解
阅读更多- SpringBoot整合RabbitMQ之典型应用场景实战一
- RabbitMQ 实战(二)Spring Boot 整合 RabbitMQ
- SpringBoot+Maven项目实战(6):整合Log4j和Aop,实现简单的日志记录
- SpringBoot构建微服务实战 之 整合Mybatis(一)
- 【备忘】2017年最新 项目实战 Spring Boot视频教程 微服务整合Mybatis
- Spring Boot 揭秘与实战(二) 数据存储篇 - MyBatis整合
- SpringBoot实战之9 整合freemarker模版引擎
- [Spring Boot实战系列] - No.2 Spring boot 整合Spring Security用户管理和用户权限管理
- SpringBoot+Maven项目实战(3):整合Freemark模板
- Spring Boot 揭秘与实战(二) 数据存储篇 - JPA整合
- SpringBoot实战之12 整合restful工具swagger2
- ElasticSearch整合springboot实战
- Spring Boot2.0整合ES5实现文章内容搜索实战
- SpringBoot整合RabbitMQ之典型应用场景实战二
- springboot+rabbitMq整合开发实战一
- spring boot实战(第十二篇)整合RabbitMQ
- RabbitMQ 实战(二)Spring Boot 整合 RabbitMQ
- swagger 与 springboot 的整合实战
- Spring Boot整合RabbitMQ开发实战详解
- CK1956-2017年最新项目实战Spring Boot视频微服务整合Mybatis