RabbitMQ消息队列+spring监听mq服务器,接收消费mq消息
2018-01-18 10:21
851 查看
最近soa项目要和官网系统对接,实现mq信息监听,保存等一些列操作。项目用的是Maven+SSM框架。然后学习和开发用了两天时间,算是搞定,趁加班时间做个总结。
对于Maven工程的ssm框架,整合RabbitMq首先就是
2
3
4
5
依赖不多,就这一个就够用了。但是有个坑要注意,如果引入的位置不对,启动会出现莫名的启动报错,我试了好几次,最后放到所有引入依赖的最后面是没问题的~所以,明智的选择是引入成功后先启动项目看看会不会报错,这个时候才引入一个依赖,正常不会报错,如果报错,就是位置不对,及时调整吧。
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
自定义的监听类必须实现接口:ChannelAwareMessageListener或者MessageListener.这里最好用Channel…这个。
多出的参数Channel可以很方便的提供监听之外的ack通知RabbitMq服务器功能。
由于MQ服务器对于消费端无反馈的,有重发机制,可能会有一条数据发送多次,并且一致保存在服务器中,久而久之就会由于数据量过大造成内存溢出的危险,而ack机制就是通过消费端发出通知给mq服务器,告诉服务器那条mq已经处理完毕,可以剔除。对于处理异常的,则可以重新回到mq服务器的队列中。而实现手动实现这个ack的关键点就是实现接口:ChannelAwareMessageListener.
如何实现手动ack?
手动ack就是在当前channel里面调用basic***的方法,并传入当前消息的tagId就可以了。
这里的ack通知分为三种情况:
a.消费端正常处理完成一条mq时,ack mq服务器可以移除此条mq的方法
//消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
参数解释:
deliveryTag:该消息的index
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
b. 消费端处理完成一条mq时发生异常,ack 会将此条mq重新放到mq服务器队列queue中
//ack返回false,此条mq并重新回到队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
参数解释:
deliveryTag:该消息的index
multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列
c.就是消费端主动拒绝mq服务器发送mq过来。
//拒绝消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
参数解释:
deliveryTag:该消息的index
requeue:被拒绝的是否重新入队列
channel.basicNack 与 channel.basicReject
的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
这样就可以了。启动项目,有时候在项目启动完毕,监听程序开始的时候报错,报错信息大致:
2
3
这种情况网上给的大都是由于在配置对应权限上有问题,但我这里报的同样的错误,并不是他们说的那种情况,而是由于我们这里做的监听mq服务器名为’ZL.SALETRACE’的queue,如果服务器端现在没有mq,就会报这个错,估计是没有mq,服务器暂时没有分配队列,队列就没有生成,但这个不是啥配置问题,是初始化问题。如果现在服务器端发送mq,还是能迅速的接收到发送的mq的,然后一切就正常了。
所以这个问题归结于服务端没有mq,如果有mq,还是可以正常接收,没啥问题的。这种报错可以忽略。
好了,这就是本次迭代的总结。~
对于Maven工程的ssm框架,整合RabbitMq首先就是
1.引入依赖:
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.3.5.RELEASE</version> </dependency>1
2
3
4
5
依赖不多,就这一个就够用了。但是有个坑要注意,如果引入的位置不对,启动会出现莫名的启动报错,我试了好几次,最后放到所有引入依赖的最后面是没问题的~所以,明智的选择是引入成功后先启动项目看看会不会报错,这个时候才引入一个依赖,正常不会报错,如果报错,就是位置不对,及时调整吧。
2.配置文件:context-rabbitMq.xml
< 4000 ?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!-- 定义RabbitMQ的连接工厂 --> <rabbit:connection-factory id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="10.130.202.12"><!--服务器ip地址--> <property name="username" value="tele-onlineYa"> <property name="password" value="tele-onlineYa"> <property name="port" value="5678"> <property name="channelCacheSize" value="50"> <property name="virtualHost" value="tele-onlineYa"> </bean> <!--通过指定admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成。 --> <rabbit:admin connection-factory="connectionFactory" /> <!-- 队列queue,自动声明 --> <rabbit:queue name="ZL.SALETRACE" exclusive="false" auto-declare="false" durable="true" /> <!--自定义消息接受者--> <bean id="messageReceiver" class="com.zlf.cn.api.RabbitMqConsumerListener"> <!-- queue监听器,观察监听模式。当有消息到达本应用时会通知监听在对应队列queue上的监听对象 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="ZL.SALETRACE" ref="messageReceiver" /> </rabbit:listener-container> </beans>1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
3.在web.xml中引入配置的文件context-rabbitMq.xml
<context-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:META-INF/app_config/context/context-*.xml</param-value> </context-param>1
2
3
4
4.自定义监听类:RabbitMqConsumerListener
自定义的监听类必须实现接口:ChannelAwareMessageListener或者MessageListener.这里最好用Channel…这个。二者的区别可以从他们的抽象类的入参来说:一个是message(消息实体),一个是channel就是当前的通道。 ChannelAwareMessageListener的抽象类: void onMessage(Message message,Channel channel) . MessageListener接口的抽象类: void onMessage(Message message).
多出的参数Channel可以很方便的提供监听之外的ack通知RabbitMq服务器功能。
由于MQ服务器对于消费端无反馈的,有重发机制,可能会有一条数据发送多次,并且一致保存在服务器中,久而久之就会由于数据量过大造成内存溢出的危险,而ack机制就是通过消费端发出通知给mq服务器,告诉服务器那条mq已经处理完毕,可以剔除。对于处理异常的,则可以重新回到mq服务器的队列中。而实现手动实现这个ack的关键点就是实现接口:ChannelAwareMessageListener.
如何实现手动ack?
手动ack就是在当前channel里面调用basic***的方法,并传入当前消息的tagId就可以了。
这里的ack通知分为三种情况:
a.消费端正常处理完成一条mq时,ack mq服务器可以移除此条mq的方法
//消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
参数解释:
deliveryTag:该消息的index
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
b. 消费端处理完成一条mq时发生异常,ack 会将此条mq重新放到mq服务器队列queue中
//ack返回false,此条mq并重新回到队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
参数解释:
deliveryTag:该消息的index
multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列
c.就是消费端主动拒绝mq服务器发送mq过来。
//拒绝消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
参数解释:
deliveryTag:该消息的index
requeue:被拒绝的是否重新入队列
channel.basicNack 与 channel.basicReject
的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息
5.自定义类的具体实现:
import java.io.IOException import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import com.rabbitmq.client.Channel; import com.zlf.cn.action.IRabbitMqConsumerTaskAction; /** *使用监听器接收消息 *@author itw_zhanglf02 / public class RabbitMqConsumerListener implents ChannelAwareMessageListener{ private Logger logger=loggerFactory.getLogger(RabbitMqConsumerListener.class); @Resource(name=IRabbitMqConsumerTaskAction.ACTION_ID) private IRabbitMqConsumerTaskAction rabbitMqConsumerTaskAction; @Override public void onMessage(Message arg0,Channel channel){ //业务处理,放到action层,并返回处理成功还是异常的flag boolean mqFlag=rabbitMaConsumerTaskAction.saveMq(arg0); //还有一个点就是如何获取mq消息的报文部分message? //String message=new String(arg0.getBody(),"UTF-8"); if(mqFlag){ basicACK(arg0,channle);//处理正常--ack }else{ basicNACK(arg0,channle);//处理异常--nack } } //正常消费掉后通知mq服务器移除此条mq private void basicACK(Message message,Channel channel){ try{ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }catch(IOException e){ logger.error("通知服务器移除mq时异常,异常信息:"+e); } } //处理异常,mq重回队列 private void basicNACK(Message message,Channel channel){ try{ channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); }catch(IOExeption e){ logger.error("mq重新进入服务器时出现异常,异常信息:"+e); } } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
这样就可以了。启动项目,有时候在项目启动完毕,监听程序开始的时候报错,报错信息大致:
org.springframework.amqpAmqpIOException:java.io.IOException.... Case by:com.rabbitmq.client.ShutdownSignalException:channel error;protocol method:#method<channel.close>(reply-code=403,reply-text=ACCESS_REFUSED - access to queue 'ZL.SALETRACE' in vhost 'tele-onlineYA' refused for user 'tel-online',class-id=50,method-id=10)......1
2
3
这种情况网上给的大都是由于在配置对应权限上有问题,但我这里报的同样的错误,并不是他们说的那种情况,而是由于我们这里做的监听mq服务器名为’ZL.SALETRACE’的queue,如果服务器端现在没有mq,就会报这个错,估计是没有mq,服务器暂时没有分配队列,队列就没有生成,但这个不是啥配置问题,是初始化问题。如果现在服务器端发送mq,还是能迅速的接收到发送的mq的,然后一切就正常了。
所以这个问题归结于服务端没有mq,如果有mq,还是可以正常接收,没啥问题的。这种报错可以忽略。
好了,这就是本次迭代的总结。~
相关文章推荐
- RabbitMQ消息队列+spring监听mq服务器多个ip,接收消费mq消息(三)
- RabbitMQ消息队列+spring监听mq服务器,接收消费mq消息
- RabbitMQ消息队列+spring监听mq服务器多个ip,接收消费mq消息(二)
- spring+activemq实战之配置监听多队列实现不同队列消息消费
- Spring集成rabbitMQ监听消费队列消息
- SpringBoot中如何监听两个不同源的RabbitMQ消息队列
- Java——定时请求后端接口数据发送RabbitMQ消息队列到指定MQ服务器
- MQ消息队列--RabbitMQ整合Spring理论及实例讲解
- SpringBoot中如何监听两个不同源的RabbitMQ消息队列
- SpringBoot+ActiveMQ多消息队列监听
- SpringBoot的RabbitMQ消息队列: 一、消息发送接收第一印象
- springMVC+MQ 消息队列整合(二)
- PHP版 RabbitMQ小技巧(一)用代码获得服务器上的消息队列名
- spring boot Rabbitmq集成,延时消息队列实现
- RabbitMq六种使用模式(1)_直接指定消息接收队列
- RabbitMQ 用SpringBoot处理消息队列
- spring与RabbitMQ整合 消费者消费不到消息 重启才能消费到的问题解决
- 消息队列1:RabbitMQ解析并基于Springboot实战
- 基于Maven的ActiveMQ+spring 多消息队列配置实例
- SpringBoot对消息队列(MQ)的支持