您的位置:首页 > 编程语言 > Java开发

Spring集成消息队列RabbitMQ(消息失败处理)

2017-11-30 22:39 585 查看


1. RabbitMQ简介


1.1. RabbitMQ

RabbitMQ是由Erlang(爱立信公司)语言开发,实现Advanced Message Queuing Protocol (AMQP高级消息队列协议)的消息中间件。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。


1.2. 结构图


 

• Broker:消息队列服务器实体,例如RabbitMQ服务 

• Vhost:虚拟主机,默认为“/”,一个broker里可以有多个vhost,区分不同用户权限,类似java的命令空间

• Connection:应用程序与broker连接,可有多个连接 

• Channel:消息通道,connection中可建立多个channel,每个channel代表一个会话任务,所有操作都在channel中进行。 

• Exchange:消息交换机,channel中可有多个,用于投递消息。应用程序发送消息时先把消息给交换机,由交换机投递给队列,不是直接给队列。 

类型有三种:fanout(广播)、Direct(处理路由键,轮播实现)、Topic(支持消息模糊匹配) 

• Queue:队列,用于存放消息 

• Message:消息,应用程序需要发送的数据 

• Bind:根据routingKey绑定exchange与queue规则,决定消息发送的方向


1.3. 对象间关系




2. rabbitMQ与spring集成

使用spring封装的rabbitmq的 https://github.com/spring-projects/spring-amqp 做集成。


2.1. 发送消息Producer

发送接口
public interface SimpleMQProducer {

/**
* 发送消息至MQ
*/
public void sendDataToMQ(Object message);

/**
* 发送消息至MQ
*/
public void sendDataToMQ(Object message, String msgid);

}
1
2
3
4
5
6
7
8
9
10
11
12
13

发送接口实现
public class SmartMQProducer implements InitializingBean,SimpleMQProducer{

protected final Loggerx logger = Loggerx.getLogger("dao");

protected RabbitTemplate rabbitTemplate = new RabbitTemplate();

protected String queue;

protected String exchange;

protected String routingKey;

protected ConnectionFactory connectionFactory;

protected MessageConverter messageConverter;

protected RetryTemplate retryTemplate;

protected ConfirmCallback confirmCallback;

protected ReturnCallback failedCallback;

public RabbitTemplate getRabbitTemplate() {
return rabbitTemplate;
}

public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}

public void setQueue(String queue) {
this.queue = queue;
}

public void setExchange(String exchange) {
this.exchange = exchange;
}

public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}

public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}

public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}

public void setRetryTemplate(RetryTemplate retryTemplate) {
this.retryTemplate = retryTemplate;
}

public void setConfirmCallback(ConfirmCallback confirmCallback) {
this.confirmCallback = confirmCallback;
}

public void setFailedCallback(ReturnCallback failedCallback) {
this.failedCallback = failedCallback;
}

@Override
public void sendDataToMQ(Object message) {
CorrelationData correlationId = null;
try {
correlationId = new CorrelationData(GUID.genTxNo(25));
} catch (Exception e) {
logger.error(LogType.EX, "产生消息id失败",e);
correlationId = new CorrelationData(UUID.randomUUID().toString());
}
this.rabbitTemplate.convertAndSend(this.routingKey, message, correlationId);
logger.info(LogType.EX, "发送到MQ的消息内容["+JsonUtil.toJSONString(message)+"],消息ID["+correlationId.getId()+"]");
}

@Override
public void sendDataToMQ(Object message, String msgid) {
CorrelationData correlationId = new CorrelationData(msgid);
this.rabbitTemplate.convertAndSend(this.routingKey, message, correlationId);
logger.info(LogType.EX, "发送到MQ的消息内容["+JsonUtil.toJSONString(message)+"],消息ID["+correlationId.getId()+"]");
}

@Override
public void afterPropertiesSet() throws Exception {
this.rabbitTemplate.setQueue(this.queue);
this.rabbitTemplate.setExchange(this.exchange);
this.rabbitTemplate.setRoutingKey(this.routingKey);
this.rabbitTemplate.setConnectionFactory(this.connectionFactory);
this.rabbitTemplate.setMessageConverter(this.messageConverter);
this.rabbitTemplate.setMandatory(true);
if (null == this.failedCallback) {
// 确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中即可,只要正确的到达exchange中,broker即可确认该消息返回给客户端ack。
this.rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//记录本地日志
Object object = StringUtil.ByteToObject(message.getBody());
logger.error(LogType.EX, "消息发送到MQ失败,内容["+object+"]");
}
});
}else {
this.rabbitTemplate.setReturnCallback(this.failedCallback);
}
//设置回调
this.rabbitTemplate.setConfirmCallback(this.confirmCallback);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108

发送到MQ失败回调处理
public abstract class SmartMQFailedCallBack implements ReturnCallback {

protected final Loggerx logger = Loggerx.getLogger("bo");
/**
* 确认消息是否到达broker服务器,也就是只确认是否正确到达queue中即可,只要正确的到达queue中,broker即可确认该消息返回给客户端ack。
*/
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
if (null != message) {
Object object = StringUtil.ByteToObject(message.getBody());
logger.error(LogType.EX, "消息发送到MQ失败,内容["+object+"]");
executeFailedMessage(object);
}else {
logger.error(LogType.EX, "消息发送到MQ失败,消息内容为null");
}
}

public abstract void executeFailedMessage(Object message);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

发送到MQ后回调处理(不分成功或失败)
public abstract class SmartMQConfirmCallBack implements ConfirmCallback{
protected final Logger logger = Logger.getLogger("bo");

/**
* 确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中即可,只要正确的到达exchange中,broker即可确认该消息返回给客户端ack。
*
*/
public void confirm(CorrelationData correlationData, boolean ack) {
if (ack) {
logger.info(LogType.INFO, "消息成功消费,消息ID["+correlationData.getId()+"]");
} else {
logger.error(LogType.EX, "消息失败消费,消息ID["+correlationData.getId()+"]");
}
executeCallBack(correlationData.getId(),ack);
}

public abstract void executeCallBack(String msgID,boolean ack);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

发送端spring
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

<!-- 连接服务配置 -->
<bean id="mqConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="xxx.xxx.xx.xx"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="virtualHost" value="/"/>
<property name="channelCacheSize" value="50"/>
<property name="publisherConfirms" value="true"/>
<property name="publisherReturns" value="true"/>
</bean>

<rabbit:admin connection-factory="mqConnectionFactory" />
<!-- 声明消息转换器为SimpleMessageConverter -->
<bean id="msgConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter" />
<!-- 消息发送重试 ,可选项-->
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>
<!-- queue 队列声明 -->
<!-- durable=true,交换机持久化,rabbitmq服务重启交换机依然存在,保证不丢失; durable=false,相反 -->
<!-- auto-delete=true:无消费者时,队列自动删除; auto-delete=false:无消费者时,队列不会自动删除 -->
<!-- 排他性,exclusive=true:首次申明的connection连接下可见; exclusive=false:所有connection连接下都可见-->
<rabbit:queue id="test" durable="true" auto-delete="false" exclusive="false" name="test" />

<!-- exchange queue binging key 绑定 -->
<!-- durable=true,交换机持久化,rabbitmq服务重启交换机依然存在,保证不丢失; durable=false,相反 -->
<!-- auto-delete=true:无消费者时,队列自动删除; auto-delete=false:无消费者时,队列不会自动删除 -->
<rabbit:direct-exchange name="test" durable="true" auto-delete="false" id="test">
<rabbit:bindings>
<rabbit:binding queue="test" key="test_key" />
</rabbit:bindings>
</rabbit:direct-exchange>

<!-- 消息发送到mq回调处理,需要处理错误消息,可选项 -->
<bean id="testfailedCallback" class="xxx.TestMsgFailedCallBack"></bean>
<!-- 消息发送到mq回调处理,接着业务处理 ,可选项-->
<bean id="testconfirmCallback" class="xxx.TestconfirmCallback"></bean>
<bean id="testProducer" class="XXXX.SmartMQProducer">
<property name="connectionFactory" ref="mqConnectionFactory" />
<property name="messageConverter" ref="msgConverter" />
<property name="retryTemplate" ref="retryTemplate" />
<property name="confirmCallback" ref="testconfirmCallback" />
<property name="failedCallback" ref="testfailedCallback" />
<property name="exchange" value="test" />
<property name="queue" value="test" />
<property name="routingKey" value="test" />
</bean>

</beans>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67


2.2. 消费端Consumer

接收端spring配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd"> 
<!-- 连接服务配置 -->
<bean id="mqConnectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="xxx.xxx.xxx.xxx" />
<property name="username" value="guest" />
<property name="password" value="guest" />
<property name="virtualHost" value="/" />
<property name="channelCacheSize" value="50" />
</bean>
<!-- 创建rabbitAdmin 代理类 -->
<rabbit:admin connection-factory="mqConnectionFactory" />

<!-- 声明消息转换器为SimpleMessageConverter -->
<bean id="msgConverter"
class="org.springframework.amqp.support.converter.SimpleMessageConverter" />

<!-- queue 队列声明 -->
<rabbit:queue id="test" name="test" durable="true" auto-delete="false" exclusive="false" />

<!-- exchange queue binging key 绑定 -->
<!-- durable=true,交换机持久化,rabbitmq服务重启交换机依然存在,保证不丢失; durable=false,相反 -->
<!-- auto-delete=true:无消费者时,队列自动删除; auto-delete=false:无消费者时,队列不会自动删除 -->
<!-- 通过Binding来判定Queue、Exchange、routingKey -->
<rabbit:direct-exchange id="test" name="test"
durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="test" key="test_key" />
</rabbit:bindings>
</rabbit:direct-exchange>

<bean id="testMsgHandler" class="xxxx.testMsgHandler" />
<bean id="testMsgAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="testHandler" />
<property name="defaultListenerMethod" value="handleTxMsg"></property>
<property name="messageConverter" ref="msgConverter"></property>
</bean>
<bean id="testlistenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="queueNames" value="test"></property>
<property name="connectionFactory" ref="mqConnectionFactory"></property>
<property name="messageListener" ref="testAdapter"></property>
<property name="maxConcurrentConsumers" value="100" />
<property name="concurrentConsumers" value="20" /
</bean>

</beans>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

接收端封装
public abstract class ConsumerHandler{

protected static Logger logger = Logger.getLogger("bo");

public void handleTxMsg(Object message) {
try {
//基类的业务处理
}
logger.info(LogType.MQ, "消息内容["+JsonUtil.toJSONString(message)+"]");
handler(message);
} catch (Exception e) {
logger.error(LogType.EX, e, e);
} finally {
LogContextHolder.getInstance().removeLogContext();
}
}
//实际的业务处理
public abstract void handler(Object message);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20


3. 效果



版权声明:本文为博主原创文章,未经博主允许不得转载。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: