您的位置:首页 > 编程语言 > Java开发

spring boot 集成RabbitMQ

2018-03-15 14:14 781 查看
首先介绍下MQ1:MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法2:MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。3:在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。4:MQ则是遵循了AMQP协议的具体实现和产品。5:AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制.Erlang中的实现有 RabbitMQ.RabbitMQ的工作流程如下所示


前面是生产者,中间是RabbitMQ,后面是消费者。MQ还分为交换机(exchange)和队列(queues)。两者的关系是队列绑定到交换机上。具体流程是:生产者生产消息发送到交换机,交换机负责把消息转发到与自己绑定的队列上(交换机不负责存储),消费者从自己监听的队列中取出消息并且消费。这里还存在两个机制,一个是Confirm机制,一种是ACK机制。前一种是生产者发送给MQ时,是否发送成功的确认。后者是消息被消费者拿到是否手动确认MQ删除此消息。两者都可以不设置,Confirm不设置,表示无论生产者是否成功发送消息到MQ,都不做处理,这里需要自己代码实现。这样的缺点就是,在向某个交换机发送消息时,由于某些原因没成功,又没设置Confirm,造成消息丢失。ACK不设置,表示noack即不确认,只要消费者拿到消息,MQ就会删除队列中的消息,无论消费者是否成功消费。这样的缺点是,当消费者拿到消息后没有消费成功,此消息已经在MQ中删除了,造成消息丢失。这两种机制都在后面的代码中有所体现。下面介绍下交换机的类型,有四种类型,分别为Direct,Topic,headers,Fanout.1:Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.eg:这是一个完整key的匹配。如果一个队列绑定到交换机上要求路由键位"dog",则只有被标记为"dog"的消息才被转发到该队列上,不会转发"dog.1"和"dog.1.2",只会转发"dog"。

2:Topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.eg:队列需要绑定在一个模式上,符号“#”匹配一个或多个词,符号“*”只能匹配一个词。因为"dog.#"能匹配到"dog.1.2",但是“dog.*”只能匹配到“dog.1”3:headers也是根据一个规则进行匹配,在消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中.4:Fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.spring boot 集成 RabbitMQ基于maven,创建两个工程,一个生产者和一个消费者




两者的pom文件是一样的,看下生产者的
    <parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
<scope>true</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- 添加springboot对amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-jasper</artifactId>
<scope>provided</scope>
</dependency>

</dependencies>

这里注意一点是添加了springboot的对amqp的支持配置文件application.properties如下,同样生产者和消费者配置是一样的。
#MQ链接相关---开始
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtualHost=/taotao
#MQ链接相关---结束

#MQ ack相关---开始
#客户端确认模式,客户端发送消息到mq,mq会异步返回是否收到消息。
#默认没有确认,无论发送是否成功,客户端都不会知晓
spring.rabbitmq.publisher-confirms=true
#服务端确认模式,消费者消费成功向mq发送删除已经消费消息的信息。
#默认消费者接收到消息,mq就会删除
spring.rabbitmq.listener.acknowledge-mode=MANUAL
#MQ ack相关---结束


还有一处是一样的RabbitMQConfig类,代码如下:
package cn.rabbitmq.example;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

// 创建Topic类型交换机
@Bean
public TopicExchange defaultExchange() {
// 第一个参数交换机名称
// 第二个参数是否交换机久化,true为持久化,创建后重启依然存在
// 第三个参数是在交换机不在使用的情况下,自动删除
return new TopicExchange("TopicExchange", true, false, null);
}

// 创建队列
@Bean
public Queue queue1() {
// 第一个参数队列名称
// 第二个参数是否列队持久化,true为持久化,创建后重启依然存在
return new Queue("queue3", true);
}

@Bean
public Queue queue2() {
return new Queue("queue4", true);
}

// 交换机与队列绑定,并且设置了路由匹配
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(defaultExchange()).with("item.*");
}

// 交换机与队列绑定,并且设置了路由匹配
@Bean
public Binding binding2() {
return BindingBuilder.bind(queue2()).to(defaultExchange()).with("item.#");
}

}

具体的解释在代码中,这里定义了一个交换机和两个队列,并且两个队列都绑定到了交换机上,只是路由通配符不同。接下来就是发送和消费了。但是其中有些问题是无法避免的。1:生产者发送消息到MQ然后返回,MQ接收到了后异步去向生产者确认,但是在MQ确认接收到时,网路出现问题无法向生产者确认这时应该怎么办?生产者不知道MQ是否收到,这时必须重新发送。2:上一个问题可能导致MQ中的消息是重复的,消费者消费到重复的数据,消费者应该去重。

解决问题1:在本地写个缓存,把每次要发送的数据缓存到本地,生产者接收到MQ发回的确认信息后,删除本地缓存数据。开启一个线程去处理缓存中的数据,一定时间内,遍历缓存数据重新发送。
bb71
解决问题2:生产者除了发送消息外,再为每个消息生成个id,消费者根据id是否相同决定是否消费此消息。用到一个map,每次拿到id先判断map中是否存在此id,存在表明已经消费过,直接ack让MQ删除消息,map不存在此id,把id存入map后消费。

生产者本地缓存代码:
public class RetrySendCache {
private MessageSender messageSender;
private ConcurrentHashMap<String,MessageWithTime> map=new ConcurrentHashMap();
private String exchangeName;
private String key;

public int getSize(){
return map.size();
}

public RetrySendCache(){
startRetry();
}

private static class MessageWithTime{
private long time;
private Object message;
public MessageWithTime(long time,Object message){
this.time=time;
this.message=message;
}
public long getTime() {
return time;
}
public Object getMessage() {
return message;
}

}
public void setSenderInfo(MessageSender messageSender, String exchangeName, String key){
this.messageSender=messageSender;
this.exchangeName=exchangeName;
this.key=key;
}

public void put(String id,Object message){
map.put(id, new MessageWithTime(System.currentTimeMillis(), message));
}

public void remove(String id){
map.remove(id);
}

public void startRetry(){
new Thread(new Runnable() {
@Override
public void run() {
while(true){
try {
Thread.sleep(30*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for(String key  : map.keySet()){
long now=System.currentTimeMillis();
MessageWithTime messageWithTime=map.get(key);
//超过一定时间没有ack直接删除本地缓存的message,重试两次
if(messageWithTime.getTime()+3*30*1000<=now){
remove(key);
}else if(messageWithTime.getTime()+30*1000<=now){
SendStatusMessage message = messageSender.send(messageWithTime.getMessage(),exchangeName,key);
if(message.isFlag()){
remove(key);
}
}
}
}
}
}).start();

}
}

生产者代码
@Component
public class MessageSender  implements ConfirmCallback,SenderInterface{
@Autowired
private RabbitTemplate rabbitTemplate;

static RetrySendCache cache=new RetrySendCache();

@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}

public SendStatusMessage send(Object message,String exchangeName,String key){
try {
cache.setSenderInfo(this,exchangeName,key);
String uuid = UUID.randomUUID().toString();
cache.put(uuid, message);
Message msg=new Message(message,uuid);
rabbitTemplate.convertAndSend(exchangeName, key, FastJsonUtil.objectToString(msg), new CorrelationData(uuid));
} catch (AmqpException e) {
e.printStackTrace();
return new SendStatusMessage(false, "");
}

return new SendStatusMessage(true, "");

}

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
System.err.println("发送失败------: " + cause);
}else{
cache.remove(correlationData.getId());
}

}
}

消费者代码
public abstract class BaseConsumer implements Consumer{
@Override
@RabbitListener(queues="#{'${rabbitmq.listener.queue.name}'.split(',')}")
//以下固定有两个参数,也可以只有message一个参数
public void consume(Message s, Channel channel) {
byte[] body = s.getBody();
MessageDetail obj = FastJsonUtil.stringToMessage(body);
String message = map.get(obj.getUuid());
if (StringUtils.isBlank(message)) {
map.put(obj.getUuid(), obj.getUuid());// 内存不够怎么办,定期清理
try {
//消费具体逻辑,子类实现
logic(new String(body));
//Delivery Tag 用来标识信道中投递的消息,RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,
//以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。
//RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增
//basicAck 方法的第二个参数 multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认;如果为 true,
//则额外将比第一个参数指定的 delivery tag 小的消息一并确认
channel.basicAck(s.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
try {
//当消费消息出现异常时,我们需要取消确认,这时我们可以使用 Channel 的 basicReject 方法。
//第一个参数指定 delivery tag,第二个参数说明如何处理这个失败消息。
//requeue 值为 true 表示该消息重新放回队列头,值为 false 表示放弃这条消息。
//一般来说,如果是系统无法处理的异常,我们一般是将 requeue 设为 false,例如消息格式错误,再处理多少次也是异常。
//调用第三方接口超时这类异常 requeue 应该设为 true。
channel.basicReject(s.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e1) {
e1.printStackTrace();
}
}
} else {
try {
//这里并不是出现异常,而是重复的消息是不会消费的,直接通知MQ删除
channel.basicAck(s.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
}

protected abstract void  logic(String message);

}

大体代码上,后续还做了简单封装,见下一篇参考:http://www.jianshu.com/p/4112d78a8753
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  RabbitMQ