您的位置:首页 > 编程语言 > Java开发

Spring AMQP 1.6完整参考指南

2016-12-23 00:00 204 查看
-第一部分

原文:http://docs.spring.io/spring-amqp/docs/1.6.0.RELEASE/reference/html/

1.前言

SpringAMQP项目将其核心Spring概念应用于基于AMQP消息解决方案的开发中.我们提供了一个发送和接收消息的高级抽象模板.同时,我们也提供了消息驱动POJO的支持.这些包有助于AMQP资源的管理,从而提升依赖注入和声明式配置的使用.在所有这些情况中,你会发现与SpringFramework对JMS支持是相似的.对于其它项目相关的信息,可访问SpringAMQP项目主页

2.介绍

本参考文档的第一部分是对SpringAMQP的高层次以及底层的概念的讲述,还有一些可使你尽快上手运行的代码片断.

2.1快速浏览

2.1.1介绍

5分钟开启SpringAMQP的旅行.

前提:

安装和运行RabbitMQbroker(http://www.rabbitmq.com/download.html).然后获取spring-rabbitJAR以及其依赖包-最简单的方式是在你的构建工具中声明依赖,如.对于Maven来说:

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.6.0.RELEASE</version></dependency>
对于gradle:

compile'org.springframework.amqp:spring-rabbit:1.6.0.RELEASE'

兼容性

由于默认的SpringFramework版本依赖是4.2.x,SpringAMQP一般来说可兼容早期版本的SpringFramework.但基于注解的监听器和RabbitMessagingTemplate需要SpringFramework4.1+.

类似地,默认的amqp-client版本是3.6.x,但framework兼容3.4.0+.但依赖于新客户端版本的功能将无法使用.注意,这里指的是javaclientlibrary;一般来说,对于老版本的broker也可以工作.

非常非常快速

使用纯java来发送和接收消息:

ConnectionFactoryconnectionFactory=newCachingConnectionFactory(); AmqpAdminadmin=newRabbitAdmin(connectionFactory); admin.declareQueue(newQueue("myqueue")); AmqpTemplatetemplate=newRabbitTemplate(connectionFactory); template.convertAndSend("myqueue","foo"); Stringfoo=(String)template.receiveAndConvert("myqueue");
注意,在原生的JavaRabbitclient中也有一个ConnectionFactory.在上面的代码中,我们使用的是Spring抽象.我们依赖的是broker中的默认交换器(因为在发送操作中没有指定交换器),以及默认绑定(通过其名称作为路由键将所有队列绑定到默认交换器中).那些行为是由AMQP规范来定义的.

使用XML配置

同上面的例子一样,只不过将外部资源配置到了XML:

ApplicationContextcontext=newGenericXmlApplicationContext("classpath:/rabbit-context.xml"); AmqpTemplatetemplate=context.getBean(AmqpTemplate.class); template.convertAndSend("myqueue","foo"); Stringfoo=(String)template.receiveAndConvert("myqueue");
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsdhttp://www.springframework.org/schema/beans'target='_blank'>http://www.springframework.org/schema/beans/spring-beans.xsd">[/code]<rabbit:connection-factory id="connectionFactory"/>
<rabbit:template id="amqpTemplate"connection-factory="connectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="myqueue"/>
</beans>
<rabbit:admin/>声明会默认自动查找类型为Queue,Exchange和Binding的bean,并宣称他们代表的broker的用户,因此在简单的Javadriver中没有必要明确的使用那个bean.
有大量的选项来配置XMLschema中的组件属性-你可以使用xml编辑的自动完成功能来探索它们并查看它们的相关文档.

使用Java配置

同样的例子可使用java来外部配置:

ApplicationContextcontext=newAnnotationConfigApplicationContext(RabbitConfiguration.class); AmqpTemplatetemplate=context.getBean(AmqpTemplate.class); template.convertAndSend("myqueue","foo"); Stringfoo=(String)template.receiveAndConvert("myqueue"); ........ @Configuration
public classRabbitConfiguration{ @Bean
 publicConnectionFactoryconnectionFactory(){ returnnewCachingConnectionFactory("localhost"); } @Bean
 publicAmqpAdminamqpAdmin(){
returnnewRabbitAdmin(connectionFactory());  } @Bean
 publicRabbitTemplaterabbitTemplate(){ returnnewRabbitTemplate(connectionFactory()); } @Bean
 publicQueuemyQueue(){ returnnewQueue("myqueue"); } }

2.2新特性

2.2.1从1.5到1.6的变化

测试支持


提供了一个新的测试支持包.参考Section3.4,“TestingSupport”来了解更多信息.

Builder


现在Builders提供了更方便的API来配置Queue和Exchange对象.参考thesectioncalled“BuilderAPIforQueuesandExchanges”来了解更多信息.

命名空间变化

连接工厂


为连接工厂bean声明增加了thread-factory,例如,以amqp-client包创建的线程命名.参考Section3.1.2,“ConnectionandResourceManagement”来了解更多信息.

当使用CacheMode.CONNECTION时,现在你可以限制允许连接的总数了.参考Section3.1.2,“ConnectionandResourceManagement”来了解更多信息.

Queue定义


现在为匿名队列提供了命名策略;参考thesectioncalled“AnonymousQueue”来了解更多信息.

监听器容器变化

空闲消息监听器检测


当空闲时,现在可以配置监听器容器来发布ApplicationEvent事件了.参考thesectioncalled“DetectingIdleAsynchronousConsumers”来了解更多信息.

不匹配的队列检测


默认情况下,当监听器容器启动时,如果探测到了队列的mismatched属性或参数,容器会记录异常但会继续监听.现在容器有了一个mismatchedQueuesFatal属性,当启动时存在问题的话,这可以阻止容器(以及上下文)启动.如果随后检测到问题,它也会停止容器,如从连接故障中恢复.参考Section3.1.15,“MessageListenerContainerConfiguration”来了解更多信息.

监听器容器日志


现在监听器容器提供了它的beanName给内部SimpleAsyncTaskExecutor作为threadNamePrefix.对于日志分析非常有用.

默认错误处理器


默认错误处理器(ConditionalRejectingErrorHandler)现在认为无可挽救的@RabbitListener异常是致命的.参考Section3.1.13,“ExceptionHandling”来了解更多信息.

AutoDeclare与RabbitAdmins


参考Section3.1.15,“MessageListenerContainerConfiguration”(autoDeclare)来查看在应用程序上下文使用RabbitAdmins的语法变化.

AmqpTemplate:receive与timeout


AmqpTemplate和它的RabbitTemplate实现引入了许多新的带有timeout的receive()方法.参考thesectioncalled“PollingConsumer”来了解更多信息.

AsyncRabbitTemplate


引入了新的AsyncRabbitTemplate.此模块提供了许多发送和接收的方法,其返回值为ListenableFuture,此后可通过它来同步或异步地获取结果.参考thesectioncalled“AsyncRabbitTemplate”来了解更多信息

RabbitTemplate变化


1.4.1在broker支持时,引入了Directreply-to的能力;它比使用临时队列来回应更高效.这个版本允许你覆盖这种默认行为,通常设置useTemporaryReplyQueues属性为true来使用临时队列.参考thesectioncalled“RabbitMQDirectreply-to”来了解更多信息.

RabbitTemplate现在支持user-id-expression(userIdExpression当使用Java配置时).参考SeeValidatedUser-IDRabbitMQdocumentationandthesectioncalled“ValidatedUserId”来了解更多信息.

消息属性

CorrelationId


correlationId消息属性现在可以是字符串了.参考thesectioncalled“MessagePropertiesConverters”来了解更多信息.

长字符串头


以前,DefaultMessagePropertiesConverter会将头转换成(其头长度不超过长字符串限制(默认为1024))成一个DataInputStream(实际上它只是引用LongString的DataInputStream).
在输出时,这个头不会进行转换(除了字符串,例如.在java.io.DataInputStream@1d057a39上调用toString()方法).

在这个版本中,LongLongString默认作为LongStrings;你可以通过其getBytes[],toString(),或getStream()方法来访问它的内容.更大的输入LongString现在也会在输出上正确转换了.

参考thesectioncalled“MessagePropertiesConverters”来了解更多信息.

InboundDeliveryMode


deliveryMode属性不再映射MessageProperties.deliveryMode;这是为了避免意外传播,如果同一个MessageProperties对象用来发送出站消息.
相反,入站deliveryMode头映射为MessageProperties.receivedDeliveryMode.

参考thesectioncalled“MessagePropertiesConverters”来了解更多信息.

当使用注解endpoints时,头将在名为AmqpHeaders.RECEIVED_DELIVERY_MODE的头中提供.

参考thesectioncalled“AnnotatedEndpointMethodSignature”来了解更多信息.

InboundUserID


user_id属性不再映射MessageProperties.userId;这是为了避免意外传播,如果同一个MessageProperties对象用来发送出站消息.相反,入站userId头会映射到MessageProperties.receivedUserId.

参考thesectioncalled“MessagePropertiesConverters”来了解更多信息.

当使用注解endpoints时,头将在名为AmqpHeaders.RECEIVED_USER_ID的头中提供.

参考thesectioncalled“AnnotatedEndpointMethodSignature”来了解更多信息.

RabbitAdmin变化

DeclarationFailures


之间,ignoreDeclarationFailures标志只在channel上发生IOException才会生效(如未匹配参数).现在它对于任何异常都可以生效(如:TimeoutException).
此外,无论何时声明失败,都会发布DeclarationExceptionEvent.RabbitAdmin最后声明事件将作为属性lastDeclarationExceptionEvent也是可用的.

参考Section3.1.10,“Configuringthebroker”来了解更多信息.

@RabbitListenerChanges

MultipleContainersperBean


当使用Java8+,可以添加多个@RabbitListener注解到@Bean类或它们的方法上.当使用Java7-,你可以使用@RabbitListeners容器注解来提供同样的功能.
参考thesectioncalled“@Repeatable@RabbitListener”来了解更多信息.

@SendToSpELExpressions


@SendToforroutingreplieswithnoreplyTopropertycannowbeSpELexpressionsevaluatedagainsttherequest/reply.

参考thesectioncalled“ReplyManagement”来了解更多信息.

@QueueBindingImprovements


现在你可以在@QueueBinding注解中为queues,exchanges和bindings指定参数.

Header交换器可通过@QueueBinding来支持.

参考thesectioncalled“Annotation-drivenListenerEndpoints”来了解更多信息.

延迟消息交换器(DelayedMessageExchange)


SpringAMQP现在有了第一个支持RabbitMQDelayedMessageExchange插件.参考Section3.1.11,“DelayedMessageExchange”来了解更多信息.

交换器内部标志(Exchangeinternalflag)


任何Exchange定义现在都可标记为internal,当声明交换器时,RabbitAdmin会将值传递给broker.

参考Section3.1.10,“Configuringthebroker”来了解更多信息.

CachingConnectionFactory变化

CachingConnectionFactoryCacheStatistics


CachingConnectionFactory现在通过运行时和JMX提供了cache属性.参考thesectioncalled“RuntimeCacheProperties”来了解更多信息.

访问底层RabbitMQ连接工厂


添加了一个新的getter方法来获取底层factory.这是很有用的,例如,可以添加自定义属性.参考Section3.1.3,“AddingCustomClientConnectionProperties”来了解更多信息.

ChannelCache


默认的channel缓存大小已从1增加到了25.参考Section3.1.2,“ConnectionandResourceManagement”来了解更多信息.

此外,SimpleMessageListenerContainer不再设置缓存大小至少要与concurrentConsumers的数目一样大-这是多余的,因为容器消费者channels是不会缓存的.

RabbitConnectionFactoryBean


factorybean现在暴露了一个属性来将client连接属性添加到由工厂产生的连接中.

Java反序列化(Deserialization)


当使用Java反序列化时,现在允许配置类的白名单.如果你从不可信来源接收消息,考虑创建白名单是很重要的.参考thesectioncalled“JavaDeserialization”来了解更多信息.

JSONMessageConverter


改善了JSON消息转换器,现在允许消费消息在消息头中可以没有类型(type)信息.参考thesectioncalled“MessageConversionforAnnotatedMethods”和thesectioncalled“Jackson2JsonMessageConverter”来了解更多信息.

LoggingAppenders

Log4j2


加入了log4j2appender,此appenders现在可配置addresses属性来连接broker集群.

Client连接属性


现在你可以向RabbitMQ连接中加入自定义连接属性了.

参考Section3.2,“LoggingSubsystemAMQPAppenders”来了解更多信息.

2.2.2早期版本


参考SectionA.2,“PreviousReleases”来了解早期版本的变化.

-第二部分

3.参考

这部分参考文档详细描述了组成SringAMQP的各种组件.mainchapter涵盖了开发AMQP应用程序的核心类.这部分也包含了有关示例程序的章节.

3.1使用SpringAMQP

在本章中,我们将探索接口和类,它们是使用SpringAMQP来开发应用程序的必要组件.

3.1.1AMQP抽象

介绍

SpringAMQP由少数几个模块组成,每个都以JAR的形式来表现.这些模块是:spring-amqp和spring-rabbit.spring-amqp模块包含org.springframework.amqp.core包.
在那个包中,你会找到表示AMQP核心模块的类.我们的目的是提供通用的抽象,不依赖于任何特定中间件的实现或AMQP客户端库。
最终用户代码将更具有移植性,以便跨供应商实现,因为它们可以对抽象层开发。这些抽象是使用broker的特定模块实现的,如spring-rabbit。
目前只有一个RabbitMQ实现;而对于抽象的验证,除了RabbitMQ外,也已经在.Net平台上使用ApacheQpid得到了验证。
由于AMQP原则上工作于协议层次,RabbitMQ客户端可以在任何支持相同的协议版本的broker中使用,但目前我们没有测试其它任何broker。

这里的概述假设你已经熟悉了AMQP规范.如果你还没有,那么你可以查看第5章,其它资源中列举的资源.

Message

AMQP 0-8和0-9-1规范没有定义一个消息类或接口.相反,当执行basicPublish()这样的操作的时候,内容是以字节数组参数进行传递的,其它额外属性也是以单独参数进行传递的.SpringAMQP定义了一个Message类来作为AMQP领域模型表示的一部分.Message类的目的是在单个实例中简化封装消息体(body)和属性(header),这样API就可以变得更简单.Message类的定义相当简单.

public classMessage{

private finalMessagePropertiesmessageProperties;

private final byte[]body;

publicMessage(byte[]body,MessagePropertiesmessageProperties){
this.body=body;
this.messageProperties=messageProperties;
}

public byte[]getBody(){
returnthis.body;
}

publicMessagePropertiesgetMessageProperties(){
returnthis.messageProperties;
}
}
MessageProperties接口定义了多个共同属性,如messageId,timestamp,contentType等等.那些属性可以通过调用setHeader(Stringkey,Objectvalue)方法使用用户定义头(user-definedheaders)来扩展.

Exchange

Exchange接口代表的是AMQPExchange,它是生产者发送消息的地方.broker虚拟主机中的交换器名称都是唯一的,同时还有少量的其它属性:

public interfaceExchange{

StringgetName();

StringgetExchangeType();

booleanisDurable();

booleanisAutoDelete();

Map<String,Object>getArguments();

}
正如你所看到的,Exchange还有一个type(它是在ExchangeTypes中定义的常量).基本类型是:Direct,Topic,Fanout,和Headers.
在核心包中,你可以找到每种类型的Exchange接口实现.这些交换器类型会在处理队列绑定时,行为有所不同.
例如,Direct交换器允许队列以固定路由键进行绑定(通常是队列的名称).
Topic交换器支持使用路由正则表达式(*通配符明确匹配一个,而#通配符可匹配0个或多个).

Fanout交换器会把消息发布到所有绑定到它上面的队列而不考虑任何路由键.

关于交换器类型的更多信息,查看Chapter5,OtherResources.

AMQP规范还要求任何broker必须提供一个默认的无名字的(空字符串)Direct交换器.所有声明的队列都可以用它们的名字作为路由键绑定到默认交换器中.在Section3.1.4,“AmqpTemplate”你会了解到更多关于在SringAMQP中使用默认交换器的使用情况.

Queue

Queue代表的是消费者接收消息的组件.像各种各样的Exchange类,我们的实现目标是作为核心AMQP类型的抽象表示.

public classQueue{

private finalStringname;

private volatile booleandurable;

private volatile booleanexclusive;

private volatile booleanautoDelete;

private volatileMap<String,Object>arguments;

/**
*Thequeueisdurable,non-exclusiveandnonauto-delete.
*
*@paramnamethenameofthequeue.
*/

 publicQueue(Stringname){
this(name,true,false,false);
}

//GettersandSettersomittedforbrevity

}
注意,构造器需要接受队列名称作为参数.根据实现,admintemplate可能会提供生成独特队列名称的方法.这些队列作为回复地址或用于临时情景是非常有用的.
基于这种原因,自动生成队列的exclusive和autoDelete属性都应该设置为true.

参考Section3.1.10,“Configuringthebroker”来了解关于使用命名空间来声明队列,包括队列参数的详细情况.

Binding

生产者发送消息到Exchange,而消费者将从Queue中获取消息,连接Queues与Exchanges之间的绑定对于通过消息来连接生产者和消费者是非常关键的.
在SpringAMQP中,我们定义了一个Binding类来表示这些连接.让我们重新回顾一下绑定队列和交换器的操作.

你可以使用固定的路由键来绑定Queue到DirectExchange上.

newBinding(someQueue,someDirectExchange,"foo.bar")
你可以使用路由正则表达式来绑定Queue到TopicExchange上.

newBinding(someQueue,someTopicExchange,"foo.*")
你可以不使用路由键来绑定Queue到FanoutExchange上.

newBinding(someQueue,someFanoutExchange)
我们还提供了BindingBuilder来方便操作.

Bindingb=BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
上面展示的BindingBuilder类很清晰,但如果为bind()方法使用静态导入,这种形式将工作得更好.

本身来说,Binding类的实例只能一个connection中持有数据.换句话说,它不是一个活力(active)组件.
但正如在后面Section3.1.10,“Configuringthebroker”看到的,Binding实例可由AmqpAdmin类来触发broker上的绑定操作.
同样,在同一个章节中,你还会看到Binding实例可在@Configuration类中使用Spring @Bean风格来定义.
还有方便的基类来简化生成AMQP相关bean定义和识别队列,交换器,绑定的方法,这样当AMQPbroker运行程序启动时,就可以得到声明.

AmqpTemplate也在核心包中定义.作为涉及AMQP消息的主要组件,会在它自己的章节中进行详细介绍(参考Section3.1.4,“AmqpTemplate”).

3.1.2连接和资源管理

介绍

虽然我们在前面章节中描述的AMQP模型是通用的,适用于所有实现,但当我们说到资源管理时,其细节是针对特定broker实现的.因此,在这个章节中,我们只关注我们的"spring-rabbit"模块,因为到现在为止,RabbitMQ是唯一支持的实现.

RabbitMQbroker中用于管理连接的中心组件是ConnectionFactory接口.ConnectionFactory实现的责任是提供一个org.springframework.amqp.rabbit.connection.Connection的实例,它包装了com.rabbitmq.client.Connection.
我们提供的唯一具体实现提CachingConnectionFactory,默认情况下,会建立应用程序可共享的单个连接代理.连接共享是可行的,因为在AMQP处理消息的工作单位实际是"channel"(在某些方面,这类似于JMS中Connection和Sessionin的关系).
你可以想象,连接实例提供了一个createChannel方法。CachingConnectionFactory实现支持这些channels的缓存,它会基于它们是否是事务的来单独维护其缓存.
当创建CachingConnectionFactory的实例时,hostname可通过构造器来提供,username和password 属性也可以提供.如果你想配置channel缓存的大小(默认是25),你可以调用setChannelCacheSize()方法.

从1.3版本开始,CachingConnectionFactory也可以同channel一样,配置缓存连接.在这种情况下每次调用createConnection()都会创建一个新连接(或者从缓存中获取空闲的连接).
关闭连接会将其返回到缓存中(如果还没有达到缓存大小的话).在这些连接上创建的Channels同样也会被缓存.单独连接的使用在某些环境中是有用的,如从HA集群中消费,连接负载均衡器,连接不同的集群成员.设置cacheMode为CacheMode.CONNECTION.

这不会限制连接的数目,它用于指定允许空闲打开连接的数目.

从1.5.5版本开始,提供了一个新属性connectionLimit.当设置了此属性时,它会限制连接的总数目,当达到限制值时,将channelCheckoutTimeLimit来等待空闲连接.如果时间超时了,将抛出AmqpTimeoutException.

重要

当缓存模式是CONNECTION时,队列的自动声明等等(参考thesectioncalled“AutomaticDeclarationofExchanges,QueuesandBindings”)将不再支持.

此外,在写作的时候,rabbitmq-client包默认为每个连接(5个线程)创建了一个固定的线程池.当需要使用大量连接时,你应该考虑在CachingConnectionFactory定制一个executor.然后,同一个executor会用于所有连接,其线程也是共享的.
executor的线程池是没有界限的或按预期使用率来设置(通常,一个连接至少应该有一个线程).如果在每个连接上创建了多个channels,那么池的大小会影响并发性,因此一个可变的线程池executor应该是最合适的.

理解缓存大小不是限制是很重要的,它仅仅是可以缓存的通道数量.当说缓存大小是10时,在实际使用中,其实可以是任何数目的通道.如果超过10个通道被使用,他们都返回到高速缓存,10个将在高速缓存中,其余的将物理关闭。

从1.6版本开始,默认通道缓存大小从1增加到了25.在高容量,多线程环境中,较小的缓存意味着通道的创建和关闭将以很高的速率运行.加大默认缓存大小可避免这种开销.
你可以监控通道的使用情况(通过RabbitMQAdminUI),如果看到有很多通道在创建和关闭,你可以增大缓存大小.缓存只会增长按需(以适应应用程序的并发性要求),所以这个更改不会影响现有的低容量应用程序。

从1.4.2版本开始,CachingConnectionFactory有一个channelCheckoutTimeout属性.当此属性的值大于0时,channelCacheSize会变成连接上创建通道数目的限制.
如果达到了限制,调用线程将会阻塞,直到某个通道可用或者超时,在后者的情况中,将抛出AmqpTimeoutException异常.

在框架(如.RabbitTemplate)中使用的通道将会可靠地返回到缓存中.如果在框架外创建了通道(如.直接访问connection(s)并调用createChannel()),你必须可靠地返回它们(通过关闭),也许需要在finally块中以防止耗尽通道.

CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connectionconnection=connectionFactory.createConnection();
当使用XML时,配置看起来像下面这样:

<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username"value="guest"/>
<property name="password" value="guest"/>
</bean>
这里也有一个SingleConnectionFactory实现,它只能用于框架的单元测试代码中.它比CachingConnectionFactory简单,因为它不会缓存通道,由于其缺乏性能和韧性,它不打算用于简单的测试以外的实际使用.
如果基于某些原因,你需要自己来实现ConnectionFactory,AbstractConnectionFactory基类提供了一个非常好的起点.

ConnectionFactory可使用rabbit命名空间来快速方便的建立:

<rabbit:connection-factory id="connectionFactory"/>
在多数情况下,这是很好的,因为框架会为你选择最好的默认值.创建的实例会是CachingConnectionFactory.要记住,默认的缓存大小是25.如果你想缓存更多的通道,你可以设置channelCacheSize属性值.在XML中,它看起来像下面这样:

<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>
在命名空间中,你也可以添加channel-cache-size属性:

<rabbit:connection-factory id="connectionFactory" channel-cache-size="50"/>
默认的缓存模式是CHANNEL,但你可以使用缓存连接来替换;在这种情况下,我们会使用connection-cache-size:

<rabbit:connection-factory id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>
Host和port属性也可以在命名空间中提供:

<rabbit:connection-factory id="connectionFactory" host="somehost" port="5672"/>
此外,如果运行集群环境中,使用addresses属性.

<rabbit:connection-factory id="connectionFactory" addresses="host1:5672,host2:5672"/>
下面是一个自定义的线程工厂,其前辍线程名称为rabbitmq-.

<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567" thread-factory="tf" channel-cache-size="10" username="user" password="password"/>
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-"/>
</bean>

配置底层客户端连接工厂

CachingConnectionFactory使用的是RabbitclientConnectionFactory的实例;当在CachingConnectionFactory设置等价属性时,许多属性(host,port,userName,password,requestedHeartBeat,connectionTimeout)来传递.
要设置其它属性(例如clientProperties),可定义一个rabbitfactory的实例,并使用CachingConnectionFactory的适当构造器来提供引用.
当使用上面提到的命名空间时,要在connection-factory属性中提供一个工厂的引用来配置.为方便起见,提供了一个工厂,以协助在一个Spring应用程序上下文中配置连接工厂,在下一节讨论。

<rabbit:connection-factory id="connectionFactory" connection-factory="rabbitConnectionFactory"/>

RabbitConnectionFactoryBean和配置SSL

从1.4版本开始,提供了一个便利的RabbitConnectionFactoryBean类通过依赖注入来配置底层客户端连接工厂的SSL属性.其它设置简单地委派给底层工厂.以前你必须以编程方式配置SSL选项。

<rabbit:connection-factoryid="rabbitConnectionFactory"connection-factory="clientConnectionFactory"host="${host}"port="${port}"virtual-host="${vhost}"username="${username}"password="${password}"/>
<beanid="clientConnectionFactory"class="org.springframework.xd.dirt.integration.rabbit.RabbitConnectionFactoryBean">
<propertyname="useSSL"value="true"/>
<propertyname="sslPropertiesLocation"value="file:/secrets/rabbitSSL.properties"/>
</bean>
参考RabbitMQDocumentation来了解关于配置SSL的更多信息.省略的keyStore和trustStore配置将在无证书验证的情况下,通过SSL来连接.Key和truststore配置可以按如下提供:

sslPropertiesLocation属性是一个SpringResource,它指向一个包含下面key的属性文件:

keyStore=file:/secret/keycert.p12 trustStore=file:/secret/trustStore keyStore.passPhrase=secret trustStore.passPhrase=secret
keyStore的truststore是指向store的SpringResources.通常情况下,这个属性文件在操作系统之下安全的,应用程序只能读取访问.

从SpringAMQP1.5版本开始,这些属性可直接在工厂bean上设置.如果同时提供了discrete和sslPropertiesLocation属性,后者属性值会覆盖discrete值.

路由连接工厂

从1.3版本开始,引入了AbstractRoutingConnectionFactory.这提供了一种机制来配置多个ConnectionFactories的映射,并通过在运行时使用lookupKey来决定目标ConnectionFactory.

通常,实现会检查线程绑定上下文.为了方便,SpringAMQP提供了SimpleRoutingConnectionFactory,它会从SimpleResourceHolder中获取当前线程绑定的lookupKey:

<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<propertyname="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/> 
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>
<rabbit:template id="template" connection-factory="connectionFactory"/>
public classMyService{

@Autowired
  privateRabbitTemplaterabbitTemplate;

publicvoidservice(StringvHost,Stringpayload){
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(),vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}

}
在使用资源后,对其进行解绑是很重要的.更多信息参考AbstractRoutingConnectionFactory的JavaDocs.

从1.4版本开始,RabbitTemplate支持SpELsendConnectionFactorySelectorExpression和receiveConnectionFactorySelectorExpression属性,
它会在每个AMQP协议交互操作(send,sendAndReceive,receiveorreceiveAndReply)进行评估,为提供的AbstractRoutingConnectionFactory类解析lookupKey值.
Bean引用,如"@vHostResolver.getVHost(#root)"可用于表达式中.对于send操作,要发送的消息是根评估对象;对于receive操作,queueName是根评估对象.

路由算法为:如果selector表达式为null,或等价于null,或提供的ConnectionFactory不是AbstractRoutingConnectionFactory的实例,根据提供的ConnectionFactory实现,
所有的工作都按之前的进行.同样的结果也会发生:如果评估结果不为null,但对于lookupKey无目标ConnectionFactory,且theAbstractRoutingConnectionFactory使用lenientFallback=true进行了配置.
当然,在AbstractRoutingConnectionFactory的情况下,它会基于determineCurrentLookupKey()的路由实现来进行回退.但,如果lenientFallback=false,将会抛出IllegalStateException异常.

Namespace在<rabbit:template>组件中也支持send-connection-factory-selector-expression和receive-connection-factory-selector-expression属性.

也是从1.4版本开始,你可以在SimpleMessageListenerContainer配置路由连接工厂.在那种情况下,队列名称的列表将作为lookupkey.例如,如果你在容器中配置setQueueNames("foo","bar"),lookupkey将是"[foo,bar]"(无空格).

-第三部分

QueueAffinity和LocalizedQueueConnectionFactory

当在集群中使用HA队列时,为了获取最佳性能,可以希望连接到主队列所在的物理broker.虽然CachingConnectionFactory可以配置为使用多个broker地址;这会失败的,client会尝试按顺序来连接.LocalizedQueueConnectionFactory使用管理插件提供的RESTAPI来确定包含master队列的节点.然后,它会创建(或从缓存中获取)一个只连接那个节点的CachingConnectionFactory.如果连接失败了,将会确定一个新的消费者可连接的master节点.LocalizedQueueConnectionFactory使用默认的连接工厂进行配置,在队列物理位置不能确定的情况下,它会按照正常情况来连接集群.

LocalizedQueueConnectionFactory是一个RoutingConnectionFactory,SimpleMessageListenerContainer会使用队列名称作为其lookupkey,这些已经在上面的thesectioncalled“RoutingConnectionFactory”讨论过了.

基于这个原因(使用队列名称来作查找键),LocalizedQueueConnectionFactory只在容器配置为监听某个单一队列时才可使用.

RabbitMQ管理插件应该在每个节点上开启.

警告

这种连接工厂用于长连接,如用在SimpleMessageListenerContainer的连接.它的目的不是用于短连接,如在RabbitTemplate中使用,这是因为在连接前,它要调用RESTAPI.此外,对于发布操作来说,队列是未知的,不管如何,消息会发布到所有集群成员中,因此查找节点的逻辑几乎没有什么意义。

这里有一个样例配置,使用了SpringBoot的RabbitProperties来配置工厂:

@Autowired
privateRabbitPropertiesprops;

privatefinalString[]adminUris={"http://host1:15672","http://host2:15672"};

privatefinalString[]nodes={"rabbit@host1","rabbit@host2"};

@Bean
publicConnectionFactorydefaultConnectionFactory(){
CachingConnectionFactorycf=newCachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
returncf;
}

@Bean
publicConnectionFactoryqueueAffinityCF(
@Qualifier("defaultConnectionFactory")ConnectionFactorydefaultCF){
  returnnewLocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
this.adminUris,this.nodes,
this.props.getVirtualHost(),this.props.getUsername(),this.props.getPassword(),
false,null);
}
注意,三个参数是addresses,adminUris和nodes的数组.当一个容器试图连接一个队列时,它们是有位置性的,它决定了哪个节点上的队列是mastered,并以同样数组位置来连接其地址.

发布者确认和返回

确认和返回消息可通过分别设置CachingConnectionFactory的publisherConfirms和publisherReturns属性为ture来完成.

当设置了这些选项时,由工厂创建的通道将包装在PublisherCallbackChannel中,这用来方便回调.当获取到这样的通道时,client可在channel上注册一个PublisherCallbackChannel.Listener.PublisherCallbackChannel实现包含一些逻辑来路由确认/返回给适当的监听器.这些特性将在下面的章节中进一步解释.

对于一些更多的背景信息,可以参考下面的博客:IntroducingPublisherConfirms.

记录通道关闭事件

1.5版本中引入了允许用户控制日志级别的机制.

CachingConnectionFactory使用默认的策略来记录通道关闭事件:

不记录通道正常关闭事件(200OK).

如果通道是因为失败的被动的队列声明关闭的,将记录为debug级别.

如果通道关闭是因为basic.consume因专用消费者条件而拒绝引起的,将被记录为INFO级别.

所有其它的事件将记录为ERROR级别.

要修改此行为,需要在CachingConnectionFactory的closeExceptionLogger属性中注入一个自定义的ConditionalExceptionLogger.

也可参考thesectioncalled“ConsumerFailureEvents”.

运行时缓存属性

从1.6版本开始,CachingConnectionFactory通过getCacheProperties()方法提供了缓存统计.这些统计数据可用来在生产环境中优化缓存.例如,最高水位标记可用来确定是否需要加大缓存.如果它等于缓存大小,你也许应该考虑进一步加大.

Table3.1.CacheMode.CHANNEL的缓存属性

PropertyMeaning
channelCacheSize当前配置的允许空闲的最大通道数量.
localPort连接的本地端口(如果可用的话).在可以在RabbitMQ 管理界面中关联connections/channels.
idleChannelsTx当前空闲(缓存的)的事务通道的数目.
idleChannelsNotTx当前空闲(缓存的)的非事务通道的数目.
idleChannelsTxHighWater同时空闲(缓存的)的事务通道的最大数目
idleChannelsNotTxHighWater同时空闲(缓存的)的非事务通道的最大数目.
Table3.2.CacheMode.CONNECTION[b]的缓存属性[/b]

PropertyMeaning
openConnections表示连接到brokers上连接对象的数目.
channelCacheSize当前允许空闲的最大通道数目
connectionCacheSize当前允许空闲的最大连接数目.
idleConnections当前空闲的连接数目.
idleConnectionsHighWater目前已经空闲的最大连接数目.
idleChannelsTx:<localPort>在当前连接上目前空闲的事务通道的数目.属性名的localPort部分可用来在RabbitMQ管理界面中关联connections/channels.
idleChannelsNotTx:<localPort>在当前连接上目前空闲和非事务通道的数目.属性名的localPort部分可用来在RabbitMQ管理界面中关联connections/channels
idleChannelsTxHighWater: <localPort>已同时空闲的事务通道的最大数目.属性名的localPort部分可用来在RabbitMQ管理界面中关联connections/channels.
idleChannelsNotTxHighWater: <localPort>忆同时空闲的非事务通道的最大数目.属性名的localPort部分可用来RabbitMQ管理界面中关联connections/channels.
cacheMode属性(包含CHANNEL或CONNECTION).

Figure3.1.JVisualVMExample



3.1.3添加自定义Client连接属性

CachingConnectionFactory现在允许你访问底层连接工厂,例如,设置自定义client属性:

connectionFactory.getRabbitConnectionFactory().getClientProperties().put("foo","bar");
当在RabbitMQ管理界面中查看连接时,将会看到这些属性.

3.1.4AmqpTemplate

介绍

像其它SpringFramework提供的高级抽象一样,SpringAMQP提供了扮演核心角色的模板.定义了主要操作的接口称为AmqpTemplate.这些操作包含了发送和接收消息的一般行为.换句话说,它们不是针对某个特定实现的,从其名称"AMQP"就可看出.另一方面,接口的实现会尽量作为AMQP协议的实现.不像JMS,它只是接口级别的API实现,AMQP是一个线路级协议.协议的实现可提供它们自己的clientlibraries,因此模板接口的实现都依赖特定的clientlibrary.目前,只有一个实现:RabbitTemplate.在下面的例子中,你会经常看到"AmqpTemplate",但当你查看配置例子或者任何实例化或调用setter方法的代码时,你都会看到实现类型(如."RabbitTemplate").

正如上面所提到的,AmqpTemplate接口定义了所有发送和接收消息的基本操作.我们将分别在以下两个部分探索消息发送和接收。

也可参考thesectioncalled“AsyncRabbitTemplate”.

添加重试功能

从1.3版本开始,你可为RabbitTemplate配置使用RetryTemplate来帮助处理broker连接的问题.参考spring-retry项目来了解全部信息;下面就是一个例子,它使用指数回退策略(exponentialbackoffpolicy)和默认的SimpleRetryPolicy(向调用者抛出异常前,会做三次尝试).

使用XML命名空间:

<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500"/>
<property name="multiplier" value="10.0"/>
<property name="maxInterval"value="10000"/>
</bean>
</property>
</bean>
使用@Configuration:

@Bean
publicAmqpTemplaterabbitTemplate();
RabbitTemplatetemplate=newRabbitTemplate(connectionFactory());
RetryTemplateretryTemplate=newRetryTemplate();
ExponentialBackOffPolicybackOffPolicy=newExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
returntemplate;
}
从1.4版本开始,除了retryTemplate属性外,RabbitTemplate上也支持recoveryCallback选项.它可用作RetryTemplate.execute(RetryCallback<T,E>retryCallback,RecoveryCallback<T>recoveryCallback)第二个参数.

RecoveryCallback会有一些限制,因为在retrycontext只包含lastThrowable字段.在更复杂的情况下,你应该使用外部RetryTemplate,这样你就可以通过上下文属性传递更多信息给RecoveryCallback

retryTemplate.execute(
newRetryCallback<Object,Exception>(){

@Override
  publicObjectdoWithRetry(RetryContextcontext)throwsException{
context.setAttribute("message",message);
returnrabbitTemplate.convertAndSend(exchange,routingKey,message);
}
},newRecoveryCallback<Object>(){

@OverridepublicObjectrecover(RetryContextcontext)throwsException{
Objectmessage=context.getAttribute("message");
Throwablet=context.getLastThrowable();
//Dosomethingwithmessage
   returnnull;
}
});
}
在这种情况下,你不需要在RabbitTemplate中注入RetryTemplate.

发布者确认和返回

AmqpTemplate的RabbitTemplate实现支持发布者确认和返回.

对于返回消息,模板的mandatory属性必须设置为true,或者对于特定消息,其mandatory-expression必须评估为true.
此功能需要将CachingConnectionFactory的publisherReturns属性设置为true(参考thesectioncalled“PublisherConfirmsandReturns”).
返回是通过注册在RabbitTemplate.ReturnCallback(通过调用setReturnCallback(ReturnCallbackcallback))来返回给客户端的.回调必须实现下面的方法:

voidreturnedMessage(Messagemessage,intreplyCode,StringreplyText,Stringexchange,StringroutingKey);
每个RabbitTemplate只支持一个ReturnCallback.也可参考thesectioncalled“ReplyTimeout”.

对于发布者确认(又名发布者应答),模板需要将CachingConnectionFactory中的publisherConfirms属性设置为true.
确认是通过注册在RabbitTemplate.ConfirmCallback(通过调用setConfirmCallback(ConfirmCallbackcallback))发送给client的.回调必须实现下面的方法:

voidconfirm(CorrelationDatacorrelationData,booleanack,Stringcause);
CorrelationData对象是在发送原始消息的时候,由client提供的.ack为true表示确认,为false时,表示不确认(nack).对于nack,cause可能会包含nack的原因(如果生成nack时,它可用的话).
一个例子是当发送消息到一个不存在的交换器时.在那种情况下,broker会关闭通道;关闭的原因会包含在cause中.cause是1.4版本中加入的.

RabbitTemplate中只支持一个ConfirmCallback.

当rabbit模板完成发送操作时,会关闭通道;这可以排除当连接工厂缓存满时(缓存中还有空间,通道没有物理关闭,返回/确认正常处理)确认和返回的接待问题.
当缓存满了的时候,框架会延迟5秒来关闭,以为接收确认/返回消息留有时间.当使用确认时,通道会在收到最后一个确认时关闭.
当使用返回时,通道会保持5秒的打开状态.一般建议将连接工厂的channelCacheSize设为足够大,这样发布消息的通道就会返回到缓存中,而不是被关闭.
你可以使用RabbitMQ管理插件来监控通道的使用情况;如果你看到通道打开/关闭的非常迅速,那么你必须考虑加大缓存,从而减少服务器的开销.

Messaging集成

从1.4版本开始,构建于RabbitTemplate上的RabbitMessagingTemplate提供了与SpringFramework消息抽象的集成(如.org.springframework.messaging.Message).
Thisallowsyoutocreatethemessagetosendingenericmanner.

验证UserId

从1.6版本开始,模板支持user-id-expression(当使用Java配置时,为userIdExpression).如果发送消息,userid属性的值将在评估表达式后进行设置.评价的根对象是要发送的消息。

例子:

<rabbit:template...user-id-expression="'guest'"/>
<rabbit:template...user-id-expression="@myConnectionFactory.username"/>
第一个示例是一个文本表达式;第二个例子将获取上下文中连接工厂bean的username属性.

3.1.5发送消息

介绍

当发送消息时,可使用下面的任何一种方法:

voidsend(Messagemessage)throwsAmqpException;

voidsend(StringroutingKey,Messagemessage)throwsAmqpException;

voidsend(Stringexchange,StringroutingKey,Messagemessage)throwsAmqpException;
我们将使用上面列出的最后一个方法来讨论,因为它实际是最清晰的.它允许在运行时提供一个AMQPExchange名称和路由键(routingkey).最后一个参数是负责初建创建Message实例的回调.使用此方法来发送消息的示例如下:

amqpTemplate.send("marketData.topic","quotes.nasdaq.FOO",
newMessage("12.34".getBytes(),someProperties));
如果你打算使用模板实例来多次(或多次)向同一个交换器发送消息时,"exchange"可设置在模板自已身上.在这种情况中,可以使用上面列出的第二个方法.下面的例子在功能上等价于前面那个:

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO",newMessage("12.34".getBytes(),someProperties));
如果在模块上设置"exchange"和"routingKey"属性,那么方法就只接受Message参数:

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(newMessage("12.34".getBytes(),someProperties));
关于交换器和路由键更好的想法是明确的参数将总是会覆盖模板默认值.事实上,即使你不在模板上明确设置这些属性,总是有默认值的地方.在两种情况中,默认值是空字符串,这是合情合理的.
就路由键而言,它并不总是首先需要的(如.Fanout交换器).此外,绑定的交换器上的队列可能会使用空字符串.这些在模板的路由键中都是合法的.
就交换器名称而言,空字符串也是常常使用的,因为AMQP规范定义了无名称的"默认交换器".
由于所有队列可使用它们的队列名称作为路由键自动绑定到默认交换器上(它是Direct交换器e),上面的第二个方法可通过默认的交换器将简单的点对点消息传递到任何队列.
只需要简单的将队列名称作为路由键-在运行时提供方法参数:

RabbitTemplatetemplate=newRabbitTemplate();//使用默认的无名交换器
template.send("queue.helloWorld",newMessage("HelloWorld".getBytes(),someProperties));
或者,如果你喜欢创建一个模板用于主要或专门向一个队列发送消息,以下是完全合理的:

RabbitTemplatetemplate=newRabbitTemplate();//使用默认无名交换器
template.setRoutingKey("queue.helloWorld");//但我们总是向此队列发送消息
template.send(newMessage("HelloWorld".getBytes(),someProperties));

MessageBuilderAPI

从1.3版本开始,通过MessageBuilder和MessagePropertiesBuilder提供了消息构建API;它们提供了更加方便地创建消息和消息属性的方法:

Messagemessage=MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar","baz")
.build();


MessagePropertiesprops=MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar","baz")
.build();
Messagemessage=MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
每个MessageProperies上定义的属性都可以被设置.其它方法包括setHeader(Stringkey,Stringvalue),removeHeader(Stringkey),removeHeaders(),和copyProperties(MessagePropertiesproperties).
每个属性方法都有一个set*IfAbsent()变种.在默认的初始值存在的情况下,方法名为set*IfAbsentOrDefault().

提供了五个静态方法来创建初始messagebuilder:

public staticMessageBuilderwithBody(byte[]body)
public staticMessageBuilderwithClonedBody(byte[]body)
public staticMessageBuilderwithBody(byte[]body,intfrom,intto)
public staticMessageBuilderfromMessage(Messagemessage)
public staticMessageBuilderfromClonedMessage(Messagemessage)

builder创建的消息body是参数的直接引用.

builder创建的消息body是包含拷贝原字节数组的新数组.

build创建的消息body是包含原字节数组范围的新数组.查看Arrays.copyOfRange()来了解更多信息.

builder创建的消息body是原body参数的直接引用.参数的属性将拷贝到新MessageProperties对象中.

builer创建的消息body包含参数body的新数组.参数的属性将拷贝到新的MessageProperties对象中.
publicstaticMessagePropertiesBuildernewInstance()
publicstaticMessagePropertiesBuilderfromProperties(MessagePropertiesproperties)
publicstaticMessagePropertiesBuilderfromClonedProperties(MessagePropertiesproperties)

新消息属性将使用默认值进行初始化

builder会使用提供的properties对象进行初始化,build()方法也会返回参数properties对象.

参数的属性会拷贝到新的MessageProperties对象中.
在AmqpTemplate的RabbitTemplate实现中,每个send()方法的重载版本都接受一个额外的CorrelationData对象.
当启用了发布者确认时,此对象会在3.1.4,“AmqpTemplate”的回调中返回.这允许发送者使用确认(ack或nack)来关联发送的消息.

发布者返回

当模板的mandatory属性为true时,返回消息将由Section3.1.4,“AmqpTemplate”描述的回调来返回.

从1.4版本开始,RabbitTemplate支持SpELmandatoryExpression属性,它将对每个请求消息进行评估,作为根评估对象来解析成布尔值.Bean引用,如"@myBean.isMandatory(#root)"可用在此表达式中.

发布者返回内部也可用于RabbitTemplate的发送和接收操作中.参考thesectioncalled“ReplyTimeout”来了解更多信息.

批量

从1.4.2版本开始,引入了BatchingRabbitTemplate.它是RabbitTemplate的子类,覆盖了send方法,此方法可根据BatchingStrategy来批量发送消息;只有当一个批次完成时才会向RabbitMQ发送消息。

publicinterfaceBatchingStrategy{

MessageBatchaddToBatch(Stringexchange,StringroutingKey,Messagemessage);

DatenextRelease();

Collection<MessageBatch>releaseBatches();

}
警告
成批的数据是保存在内存中的,如果出现系统故障,未发送的消息将会丢失.
这里提供了一个SimpleBatchingStrategy.它支持将消息发送到单个exchange/routingkey.它有下面的属性:

batchSize-发送前一个批次中消息的数量

bufferLimit-批量消息的最大大小;如果超过了此值,它会取代batchSize,并导致要发送的部分批处理

timeout-当没有新的活动添加到消息批处理时之后,将发送部分批处理的时间(atimeafterwhichapartialbatchwillbesentwhenthereisnonewactivityaddingmessagestothebatch)

SimpleBatchingStrategy通过在每个消息的前面嵌入4字节二进制长度来格式化批次消息.这是通过设置springBatchFormat消息属性为lengthHeader4向接收系统传达的.

重要

批量消息自动由监听器容器来分批(de-batched)(使用springBatchFormat消息头).拒绝批量消息中的任何一个会将导致拒绝整个批次消息.

3.1.6接收消息

介绍

Message接收总是比发送稍显复杂.有两种方式来接收Message.最简单的选择是在轮询方法调用中一次只接收一个消息.更复杂的更常见的方法是注册一个侦听器,按需异步的接收消息。
在下面两个子章节中,我们将看到这两种方法的示例.

PollingConsumer

AmqpTemplate自身可用来轮询消息接收.默认情况下,如果没有可用消息,将会立即返回null;它是无阻塞的.
从1.5版本开始,你可以设置receiveTimeout,以毫秒为单位,receive方法会阻塞设定的时间来等待消息.小于0的值则意味着无限期阻塞(或者至少要等到与broker的连接丢失).
1.6版本引入了receive方法的变种,以允许在每个调用上都可设置超时时间.

警告

由于接收操作会为每个消息创建一个新的QueueingConsumer,这种技术并不适用于大容量环境,可考虑使用异步消费者,或将receiveTimeout设为0来应对这种情况.

这里有四个简单可用的receive方法.同发送方的交换器一样,有一种方法需要直接在模板本身上设置的默认队列属性,还有一种方法需要在运行接受队列参数.
版本1.6引入了接受timeoutMillis的变种,基于每个请求重写了receiveTimeout方法.

Messagereceive()throwsAmqpException;

Messagereceive(StringqueueName)throwsAmqpException;

Messagereceive(longtimeoutMillis)throwsAmqpException;

Messagereceive(StringqueueName,longtimeoutMillis)throwsAmqpException;
与发送消息的情况类似,AmqpTemplate有一些便利的方法来接收POJOs而非Message实例,其实现可提供一种方法来定制MessageConverter以用于创建返回的Object:

ObjectreceiveAndConvert()throwsAmqpException;

ObjectreceiveAndConvert(StringqueueName)throwsAmqpException;

MessagereceiveAndConvert(longtimeoutMillis)throwsAmqpException;

MessagereceiveAndConvert(StringqueueName,longtimeoutMillis)throwsAmqpException;
类似于sendAndReceive方法,从1.3版本开始,AmqpTemplate有多个便利的receiveAndReply方法同步接收,处理,以及回应消息:

<R,S>booleanreceiveAndReply(ReceiveAndReplyCallback<R,S>callback)
throwsAmqpException;

<R,S>booleanreceiveAndReply(StringqueueName,ReceiveAndReplyCallback<R,S>callback)
throwsAmqpException;

<R,S>booleanreceiveAndReply(ReceiveAndReplyCallback<R,S>callback,
StringreplyExchange,StringreplyRoutingKey)throwsAmqpException;

<R,S>booleanreceiveAndReply(StringqueueName,ReceiveAndReplyCallback<R,S>callback,
StringreplyExchange,StringreplyRoutingKey)throwsAmqpException;

<R,S>booleanreceiveAndReply(ReceiveAndReplyCallback<R,S>callback,
ReplyToAddressCallback<S>replyToAddressCallback)throwsAmqpException;

<R,S>booleanreceiveAndReply(StringqueueName,ReceiveAndReplyCallback<R,S>callback,
ReplyToAddressCallback<S>replyToAddressCallback)throwsAmqpException;
AmqpTemplate实现会负责receive和reply阶段.在大多数情况下,如果有必要,你只需要提供ReceiveAndReplyCallback的实现来为收到的消息执行某些业务逻辑或为收到的消息构建回应对象.
注意,ReceiveAndReplyCallback可能返回null.在这种情况下,将不会发送回应,receiveAndReply的工作类似于receive方法.这允许相同的队列用于消息的混合物,其中一些可能不需要答复。

自动消息(请求和应答)转换只能适应于提供的回调不是ReceiveAndReplyMessageCallback实例的情况下-它提供了一个原始的消息交换合同。

ReplyToAddressCallback只在这种情况中有用,需要根据收到的信息通过自定义逻辑来决定replyTo地址,并在ReceiveAndReplyCallback中进行回应的情况.默认情况下,请求消息中的replyTo信息用来路由回复.

下面是一个基于POJO的接收和回复…​

booleanreceived=
this.template.receiveAndReply(ROUTE,newReceiveAndReplyCallback<Order,Invoice>(){

publicInvoicehandle(Orderorder){
returnprocessOrder(order);
}
});
if(received){
log.info("Wereceivedanorder!");
}

异步消费者

重要 

SpringAMQP也支持注解监听器endpoints(通过使用@RabbitListener注解)并提供了一个开放的基础设施,编程注册端点。
这是目前为止建立一个异步消费者的最方便方式,参考thesectioncalled“Annotation-drivenListenerEndpoints”来了解更多详情.

消息监听器

对于异步消息接收,会涉及到一个专用组件(不是AmqpTemplate).此组件可作为消息消费回调的容器.
稍后,我们会讲解这个容器和它的属性,但首先让我们来看一下回调,因为这里是你的应用程序代码与消息系统集成的地方.MessageListener接口:

public interfaceMessageListener{
voidonMessage(Messagemessage);
}
如果出于任何理由,你的回调逻辑需要依赖于AMQPChannel实例,那么你可以使用ChannelAwareMessageListener.它看起来是很相似的,但多了一个额外的参数:

publicinterfaceChannelAwareMessageListener{
voidonMessage(Messagemessage,Channelchannel)throwsException;
}
MessageListenerAdapter

如果您希望在应用程序逻辑和消息API之间保持严格的分离,则可以依赖于框架所提供的适配器实现。

这是通常被称为“消息驱动的POJO”支持。当使用适配器时,只需要提供一个适配器本身应该调用的实例引用即可。

MessageListenerAdapterlistener=newMessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");
你也可以继承适配器,并实现getListenerMethodName()方法(基于消息来动态选择不同的方法).这个方法有两个参数:originalMessage和extractedMessage,后者是转换后的结果.默认情况下,需要配置SimpleMessageConverter;
参考thesectioncalled“SimpleMessageConverter”来了解更多信息以及其它转换器的信息.

从1.4.2开始,原始消息包含consumerQueue和consumerTag属性,这些属性可用来确定消息是从那个队列中收到的.

从1.5版本开始,你可以配置消费者queue/tag到方法名称的映射(map)以动态选择要调用的方法.如果map中无条目,我们将退回到默认监听器方法.

容器

你已经看过了消息监听回调上的各种各样的选项,现在我们将注意力转向容器.基本上,容器处理主动(active)的职责,这样监听器回调可以保持被动(passive).容器是“生命周期”组件的一个例子。
它提供了启动和停止的方法.当配置容器时,你本质上缩短了AMQPQueue和MessageListener实例之间的距离.你必须提供一个ConnectionFactory的引用,队列名称或队列实例.
下面是使用默认实现SimpleMessageListenerContainer的最基础的例子:

SimpleMessageListenerContainercontainer=newSimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(newMessageListenerAdapter(somePojo));
作为一个主动组件,最常见的是使用bean定义来创建监听器容器,这样它就可以简单地运行于后台.这可以通过XML来完成:

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
或者你可以@Configuration风格:

@Configuration
public classExampleAmqpConfiguration{

@Bean
publicSimpleMessageListenerContainermessageListenerContainer(){
SimpleMessageListenerContainercontainer=newSimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
returncontainer;
}

@Bean
publicConnectionFactoryrabbitConnectionFactory(){
CachingConnectionFactoryconnectionFactory=
newCachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
returnconnectionFactory;
}

@Bean
publicMessageListenerexampleListener(){
returnnewMessageListener(){
publicvoidonMessage(Messagemessage){
System.out.println("received:"+message);
}
};
}
}
RabbitMQVersion3.2开始,broker支持消费者优先级了(参考UsingConsumerPrioritieswithRabbitMQ).

这可以通过在消费者设置x-priority参数来启用.

SimpleMessageListenerContainer现在支持设置消费者参数:

container.setConsumerArguments(Collections.
<String,Object>singletonMap("x-priority",Integer.valueOf(10)));
为了方便,命名空间在listener元素上提供了priority属性:

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10"/>
</rabbit:listener-container>
从1.3版本开始,容器监听的队列可在运行时进行修改,参考Section3.1.18,“ListenerContainerQueues”.

auto-delete队列

当容器配置为监听auto-delete队列或队列有x-expires选项或者broker配置了Time-To-Live策略,队列将在容器停止时(最后的消费者退出时)由broker进行删除.
在1.3版本之前,容器会因队列缺失而不能重启;当连接关闭/打开时,RabbitAdmin只能自动重新声明队列.

从1.3版本开始,在启动时,容器会使用RabbitAdmin来重新声明缺失的队列.

您也可以使用条件声明(thesectioncalled“ConditionalDeclaration”)与auto-startup="false"来管理队列的延迟声明,直到容器启动.

<rabbit:queue id="otherAnon" declared-by="containerAdmin"/>
<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
<rabbit:bindings>
<rabbit:binding queue="otherAnon" key="otherAnon"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container id="container2" auto-startup="false">
<rabbit:listener id="listener2" ref="foo"queues="otherAnon" admin="containerAdmin"/>
</rabbit:listener-container>
<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory" auto-startup="false"/>
在这种情况下,队列和交换器是由containerAdmin来声明的,auto-startup="false"因此在上下文初始化期间不会声明元素.同样,出于同样原因,容器也不会启动.当容器随后启动时,它会使用containerAdmin引用来声明元素.

批量消息

批量消息会自动地通过监听器容器(使用springBatchFormat消息头)来解批(de-batched).拒绝批量消息中的任何一个都将导致整批消息被拒绝.参考thesectioncalled“Batching”来了解更多关于批量消息的详情.

消费者失败事件

从1.5版本开始,无论时候,当监听器(消费者)经历某种失败时,SimpleMessageListenerContainer会发布应用程序事件.事件ListenerContainerConsumerFailedEvent有下面的属性:

container-消费者经历问题的监听容器.

reason-失败的文本原因。

fatal-一个表示失败是否是致命的boolean值;对于非致命异常,容器会根据retryInterval值尝试重新启动消费者.

throwable-捕捉到的Throwable.

这些事件能通过实现ApplicationListener<ListenerContainerConsumerFailedEvent>来消费.

concurrentConsumers大于1时,系统级事件(如连接失败)将发布到所有消费者.

如果消费者因队列是专有使用而失败了,默认情况下,在发布事件的时候,也会发出WARN日志.要改变日志行为,需要在SimpleMessageListenerContainer的exclusiveConsumerExceptionLogger属性中提供自定义的ConditionalExceptionLogger.
也可参考thesectioncalled“LoggingChannelCloseEvents”.

致命错误都记录在ERROR级别中,这是不可修改的。

-第四部分

ConsumerTags

从1.4.5版本开始,你可以提供一种策略来生成consumertags.默认情况下,consumertag是由broker来生成的.

public interfaceConsumerTagStrategy{StringcreateConsumerTag(Stringqueue);}
该队列是可用的,所以它可以(可选)在tag中使用。

参考Section3.1.15,“MessageListenerContainerConfiguration”.

注解驱动的监听器Endpoints

介绍

从1.4版本开始,异步接收消息的最简单方式是使用注解监听器端点基础设施.简而言之,它允许你暴露管理bean的方法来作为Rabbit监听器端点.

@Component
public classMyService{
 @RabbitListener(queues="myQueue")
 public voidprocessOrder(Stringdata){
 ...
 }
}
上面例子的含义是,当消息在[code=plain]org.springframework.amqp.core.Queue"myQueue"上可用时,会调用processOrder方法(在这种情况下,带有消息的负载).[/code]
通过使用RabbitListenerContainerFactory,注解端点基础设施在每个注解方法的幕后都创建了一个消息监听器容器.在上面的例子中,myQueue必须是事先存在的,
并绑定了某个交换器上.从1.5.0版本开始,只要在上下文中存在RabbitAdmin,队列可自动声明和绑定.

@Component
public classMyService{

@RabbitListener(bindings=@QueueBinding(
value=@Queue(value="myQueue",durable="true"),
exchange=@Exchange(value="auto.exch",ignoreDeclarationExceptions="true"),
key="orderRoutingKey")
)public voidprocessOrder(Stringdata){
...
}

@RabbitListener(bindings=@QueueBinding(
value=@Queue,
exchange=@Exchange(value="auto.exch"),
key="invoiceRoutingKey")
)public voidprocessInvoice(Stringdata){
...
}

}
在第一个例子中,队列myQueue会与交换器一起自动声明(持久化的),如果需要,可使用路由键来绑定到交换器上.在第二个例子中,匿名(专用的,自动删除的)队列将会声明并绑定.
可提供多个QueueBinding条目,允许监听器监听多个队列.

当前只支持DIRECT,FANOUT,TOPIC和HEADERS的交换器类型.当需要高级配置时,可使用@Bean定义.

注意第一个例子中交换器上的ignoreDeclarationExceptions.这允许,例如,绑定到有不同的设置(如.internal)的交换器上.默认情况下,现有交换器的属性必须被匹配.

从1.6版本开始,你可为队列,交换器和绑定的@QueueBinding注解中指定参数.示例:

@RabbitListener(bindings=@QueueBinding(
value=@Queue(value="auto.headers",autoDelete="true",
arguments=@Argument(name="x-message-ttl",value="10000",
type="java.lang.Integer")),
exchange=@Exchange(value="auto.headers",type=ExchangeTypes.HEADERS,autoDelete="true"),
arguments={
@Argument(name="x-match",value="all"),
@Argument(name="foo",value="bar"),
@Argument(name="baz")
})
)
publicStringhandleWithHeadersExchange(Stringfoo){
...
}
注意队列的x-message-ttl参数设为了10秒钟,因为参数类型不是String,因此我们指定了它的类型,在这里是Integer.有了这些声明后,如果队列已经存在了,参数必须匹配现有队列上的参数.对于header交换器,我们设置bindingarguments要匹配头中foo为bar,且baz可为任意值的消息.x-match参数则意味着必须同时满足两个条件.

参数名称,参数值,及类型可以是属性占位符(${...})或SpEL表达式(#{...}).name必须要能解析为String;type的表达式必须能解析为Class或类的全限定名.value必须能由DefaultConversionService类型进行转换(如上面例子中x-message-ttl).

如果name解析为null或空字符串,那么将忽略@Argument.

元注解(Meta-Annotations)

有时,你想将同样的配置用于多个监听器上.为减少重复配置,你可以使用元注解来创建你自己的监听器注解:

@Target({ElementType.TYPE,ElementType.METHOD,ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@RabbitListener(bindings=@QueueBinding(
value=@Queue,
exchange=@Exchange(value="metaFanout",type=ExchangeTypes.FANOUT)))
public@interfaceMyAnonFanoutListener{
}

public classMetaListener{

@MyAnonFanoutListener
public voidhandle1(Stringfoo){
...
}

@MyAnonFanoutListener
public voidhandle2(Stringfoo){
...
}

}
在这个例子中,每个通过@MyAnonFanoutListener创建的监听器都会绑定一个匿名,自动删除的队列到fanout交换器metaFanout上.元注解机制是简单的,在那些用户定义注解中的属性是不会经过检查的-因此你不能从元注解中覆盖设置.当需要高级配置时,使用一般的@Bean定义.

EnableListenerEndpointAnnotations

为了启用@RabbitListener注解,需要在你的某个@Configuration类中添加@EnableRabbit注解.

@Configuration
@EnableRabbit
publicclassAppConfig{

@Bean
publicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(){
SimpleRabbitListenerContainerFactoryfactory=newSimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
returnfactory;
}
}
默认情况下,基础设施会查找一个名为rabbitListenerContainerFactory的bean作为工厂来源来创建消息监听器容器.在这种情况下,会忽略RabbitMQ基础设施计划,processOrder方法可使用核心轮询大小为3个线程最大10个线程的池大小来调用.

可通过使用注解或实现RabbitListenerConfigurer

接口来自定义监听器容器工厂.默认只需要注册至少一个Endpoints,而不需要一个特定的容器工厂.查看javadoc来了解详情和例子.

如果你更喜欢XML配置,可使用<rabbit:annotation-driven>元素.

<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="concurrentConsumers "value="3"/>
<property name="maxConcurrentConsumers"value="10"/>
</bean>
注解方法的消息转换

在调用监听器之前,在管道中有两个转换步骤.第一个使用MessageConverter来将传入的SpringAMQPMessage转换成spring-消息系统的消息.当目标方法调用时,消息负载将被转换,如果有必要,也会参考消息参数类型来进行.

第一步中的默认MessageConverter是一个SpringAMQPSimpleMessageConverter,它可以处理String和java.io.Serializable对象之间的转换;其它所有的将保留为byte[].在下面的讨论中,我们称其为消息转换器.

第二个步骤的默认转换器是GenericMessageConverter,它将委派给转换服务(DefaultFormattingConversionService的实例).在下面的讨论中,我们称其为方法参数转换器.

要改变消息转换器,可在连接工厂bean中设置其相关属性:

@Bean
publicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(){
SimpleRabbitListenerContainerFactoryfactory=newSimpleRabbitListenerContainerFactory();
...
factory.setMessageConverter(newJackson2JsonMessageConverter());
...
returnfactory;
}
这配置了一个Jackson2转换器,希望头信息能通过它来指导转换.

你也可以考虑使用ContentTypeDelegatingMessageConverter,它可以处理不同内容类型的转换.

大多数情况下,没有必要来定制方法参数转换器,除非你想要用自定义的ConversionService.

在1.6版本之前,用于转换JSON的类型信息必须在消息头中提供或者需要一个自定义的ClassMapper.从1.6版本开始,如果没有类型信息头,类型可根据目标方法参数推断.

类型推断只能用于@RabbitListener的方法级.

参考thesectioncalled“Jackson2JsonMessageConverter”来了解更多信息.

如果您希望自定义方法参数转换器,您可以这样做如下:

@Configuration
@EnableRabbit
public classAppConfigimplementsRabbitListenerConfigurer{

...

@Bean
publicDefaultMessageHandlerMethodFactorymyHandlerMethodFactory(){
DefaultMessageHandlerMethodFactoryfactory=newDefaultMessageHandlerMethodFactory();
factory.setMessageConverter(newGenericMessageConverter(myConversionService()));
returnfactory;
}

@Bean
publicConversionServicemyConversionService(){
DefaultConversionServiceconv=newDefaultConversionService();
conv.addConverter(mySpecialConverter());
returnconv;
}

@Override
publicvoidconfigureRabbitListeners(RabbitListenerEndpointRegistrarregistrar){
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}

...

}
重要
对于多方法监听器(参考thesectioncalled“Multi-MethodListeners”),方法选择是基于消息转换后的消息负载,方法参数转换器只在方法被选择后才会调用.
编程式Endpoint注册

RabbitListenerEndpoint提供了一个Rabbitendpoint模型并负责为那个模型配置容器.除了通过RabbitListener注解检测外,这个基础设施允许你通过编程来配置endpoints.

@Configuration
@EnableRabbit
publicclassAppConfigimplementsRabbitListenerConfigurer{

@Override
publicvoidconfigureRabbitListeners(RabbitListenerEndpointRegistrarregistrar){
SimpleRabbitListenerEndpointendpoint=newSimpleRabbitListenerEndpoint();
endpoint.setQueueNames("anotherQueue");
endpoint.setMessageListener(message->{
//processing
});
registrar.registerEndpoint(endpoint);
}
}
在上面的例子中,我们使用了SimpleRabbitListenerEndpoint(它使用MessageListener来进行处理),但你也可以构建你自己的endpoint变种来描述自定义的调用机制.

应该指出的是,你也可以跳过@RabbitListener的使用,通过RabbitListenerConfigurer来编程注册你的endpoints.

AnnotatedEndpointMethodSignature

到目前为止,我们已经在我们的端点上注入了一个简单的字符串,但它实际上可以有一个非常灵活的方法签名。让我们重写它,以一个自定义的头来控制注入顺序:

@Component
publicclassMyService{

@RabbitListener(queues="myQueue")
publicvoidprocessOrder(Orderorder,@Header("order_type")StringorderType){
...
}
}
下面是你可以在监听端点上注入的主要元素:

原生org.springframework.amqp.core.Message.

用于接收消息的com.rabbitmq.client.Channel

org.springframework.messaging.Message代表的是传入的AMQP消息.注意,这个消息持有自定义和标准的头部信息(AmqpHeaders定义).

从1.6版本开始,入站deliveryMode头可以AmqpHeaders.RECEIVED_DELIVERY_MODE使用,代替了AmqpHeaders.DELIVERY_MODE.

@Header-注解方法参数可提取一个特定头部值,包括标准的AMQP头.

@Headers-注解参数为了访问所有头信息,必须能指定为java.util.Map.

非注解元素(非支持类型(如.Message和Channel))可认为是负荷(payload).你可以使用@Payload来明确标识.你也可以添加额外的@Valid来进行验证.

注入Spring消息抽象的能力是特别有用的,它可受益于存储在特定传输消息中的信息,而不需要依赖于特定传输API.

@RabbitListener(queues="myQueue")
public voidprocessOrder(Message<Order>order){...
}
方法参数的处理是由DefaultMessageHandlerMethodFactory提供的,它可以更进一步地定制以支持其它的方法参数.转换和验证支持也可以定制.

例如,如果我们想确保我们的Order在处理之前是有效的,我们可以使用@Valid来注解负荷,并配置必须验证器,就像下面这样:

@Configuration
@EnableRabbit
public classAppConfigimplementsRabbitListenerConfigurer{

@Override
publicvoidconfigureRabbitListeners(RabbitListenerEndpointRegistrarregistrar){
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}

@Bean
publicDefaultMessageHandlerMethodFactorymyHandlerMethodFactory(){
DefaultMessageHandlerMethodFactoryfactory=newDefaultMessageHandlerMethodFactory();
factory.setValidator(myValidator());
returnfactory;
}
}
监听多个队列

当使用queues属性时,你可以指定相关的容器来监听多个队列.你可以使用@Header注解来指定对于那些队列中收到的消息对POJO方法可用:

@Component
public classMyService{

@RabbitListener(queues={"queue1","queue2"})
public voidprocessOrder(Stringdata,@Header(AmqpHeaders.CONSUMER_QUEUE)Stringqueue){
...
}

}
从1.5版本开始,队列名称可以使用属性占位符和SpEL:

@Component
public classMyService{

@RabbitListener(queues="#{'${property.with.comma.delimited.queue.names}'.split(',')}")
public voidprocessOrder(Stringdata,@Header(AmqpHeaders.CONSUMER_QUEUE)Stringqueue){
...
}

}
在1.5版本之前,只有单个队列可以这种方法进行指定,每个队列需要一个单独的属性.

回复管理

MessageListenerAdapter现有的支持已经允许你的方法有一个非void的返回类型.在这种情况下,调用的结果被封装在一个发送消息中,其消息发送地址要么是原始消息的ReplyToAddress头指定的地址要么是监听器上配置的默认地址.默认地址现在可通过@SendTo注解进行设置.

假设我们的processOrder方法现在需要返回一个OrderStatus,可将其写成下面这样来自动发送一个回复:

@RabbitListener(destination="myQueue")
@SendTo("status")
publicOrderStatusprocessOrder(Orderorder){
//orderprocessing
 returnstatus;
}
如果你需要以传输独立的方式来设置其它头,你可以返回Message,就像这样:

@RabbitListener(destination="myQueue")
@SendTo("status")
publicMessage<OrderStatus>processOrder(Orderorder){
//orderprocessing
returnMessageBuilder
.withPayload(status)
.setHeader("code",1234)
.build();
}
@SendTo值按照exchange/routingKey模式(其中的一部分可以省略)来作为对exchange和routingKey的回复.有效值为:

foo/bar-以交换器和路由键进行回复.

foo/-以交换器和默认路由键进行回复.

baror/bar-以路由键和默认交换器进行回复.

/orempty-以默认交换器和默认路由键进行回复.

@SendTo也可以没有value属性.这种情况等价于空的sendTo模式.@SendTo只能应用于没有replyToAddress属性的入站消息中.

从1.5版本开始,@SendTo值可以通过beanSpEL表达式初始化,例如…​

@RabbitListener(queues="test.sendTo.spel")
@SendTo("#{spelReplyTo}")
publicStringcapitalizeWithSendToSpel(Stringfoo){
returnfoo.toUpperCase();
}
...
@Bean
publicStringspelReplyTo(){
return"test.sendTo.reply.spel";
}
表达式必须能评估为String,它可以是简单的队列名称(将发送到默认交换器中)或者是上面谈到的exchange/routingKey形式.

在初始化时,#{...}表达式只评估一次.

对于动态路由回复,消息发送者应该包含一个reply_to消息属性或使用运行时SpEL表达式.

从1.6版本开始,@SendTo可以是SpEL表达式,它可在运行时根据请求和回复来评估:

@RabbitListener(queues="test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.'+result.queueName}")
publicBarcapitalizeWithSendToSpel(Foofoo){
returnprocessTheFooAndReturnABar(foo);
}
SpEL表达式的运行时性质是由!{...}定界符表示的.表达式评估上下文的#root对象有三个属性:

request-o.s.amqp.core.Message请求对象.

source-转换后的o.s.messaging.Message<?>.

result-方法结果.

上下文有一个map属性访问器,标准类型转换器以及一个bean解析器,允许引用上下文中的其它beans(如.@someBeanName.determineReplyQ(request,result)).

总结一下,#{...}只在初始化的时候评估一次,#root对象代表的是应用程序上下文;beans可通过其名称来引用.!{...}会在运行时,对于每个消息,都将使用root对象的属性进行评估,bean可以使用其名称进行引用,前辍为@.

多方法监听器

从1.5.0版本开始,@RabbitListener注解现在可以在类级上进行指定.与新的@RabbitHandler注解一起,基于传入消息的负荷类型,这可以允许在单个监听器上调用不同的方法.这可以用一个例子来描述:

@RabbitListener(id="multi",queues="someQueue")
publicclassMultiListenerBean{

@RabbitHandler
@SendTo("my.reply.queue")
publicStringbar(Barbar){
...
}

@RabbitHandler
publicStringbaz(Bazbaz){
...
}

@RabbitHandler
publicStringqux(@Header("amqp_receivedRoutingKey")Stringrk,@PayloadQuxqux){
...
}

}
在这种情况下,独立的@RabbitHandler方法会被调用,如果转换后负荷是Bar,Baz或Qux.理解基于负荷类型系统来确定唯一方法是很重要的.类型检查是通过单个无注解参数来执行的,否则就要使用@Payload进行注解.注意同样的方法签名可应用于方法级@RabbitListener之上.

注意,如果有必要,需要在每个方法上指定@SendTo,在类级上它是不支持的.

@Repeatable@RabbitListener

从1.6版本开始,@RabbitListener注解可用@Repeatable进行标记.这就是说,这个注解可多次出现在相同的注解元素上(方法或类).在这种情况下,对于每个注解,都会创建独立的监听容器,它们每个都会调用相同的监听器@Bean.Repeatable注解能用于Java8+;当在Java7-使用时,同样的效果可以使用@RabbitListeners"container"注解(包含@RabbitListener注解的数组)来达到.

Proxy@RabbitListenerandGenerics

如果你的服务是用于代理(如,在@Transactional的情况中),当接口有泛型参数时,需要要一些考虑.要有一个泛型接口和特定实现,如:

interfaceTxService<P>{

Stringhandle(Ppayload,Stringheader);

}

static classTxServiceImplimplementsTxService<Foo>{

@Override
 @RabbitListener(...)
 publicStringhandle(Foofoo,Stringrk){
...
}

}
你被迫切换到CGLIB目标类代理,因为接口handle方法的实际实现只是一个桥接方法.在事务管理的情况下,CGLIB是通过注解选项来配置的-@EnableTransactionManagement(proxyTargetClass=true).在这种情况下,所有注解都需要在实现类的目标方法上进行声明:

static classTxServiceImplimplementsTxService<Foo>{

@Override
@Transactional
@RabbitListener(...)
publicStringhandle(@PayloadFoofoo,@Header("amqp_receivedRoutingKey")Stringrk){
...
}

}
容器管理

由注解创建的容器不会在上下文中进行注册.你可以调用RabbitListenerEndpointRegistry的getListenerContainers()方法来获取所有容器集合.然后,你可以迭代这个集合,例如,停止/启动所有容器或调用在其注册上调用Lifecycle方法(调用每个容器中的操作).

你也可以使用id来获取单个容器的引用,即getListenerContainer(Stringid);例如registry.getListenerContainer("multi").

从1.5.2版本开始,你可以调用getListenerContainerIds()方法来获取所有注册容器的id.

从1.5版本开始,你可在RabbitListener端点上为容器分配一个组(group).这提供了一种机制来获取子集容器的引用;添加一个group属性会使Collection<MessageListenerContainer>类型的bean使用组名称注册在上下文中.

线程和异步消费者

一些不同的线程可与异步消费者关联。

当RabbitMQClient投递消息时,来自于SimpleMessageListener配置的TaskExecutor中的线程会调用MessageListener.如果没有配置,将会使用SimpleAsyncTaskExecutor.如果使用了池化的executor,须确保池大小可以支撑并发处理.

当使用默认SimpleAsyncTaskExecutor时,对于调用监听器的线程,监听器容器的beanName将用作threadNamePrefix.这有益于日志分析,在日志appender配置中,一般建议总是包含线程名称.当在SimpleMessageListenerContainer的taskExecutor属性中指定TaskExecutor时,线程名称是不能修改的.建议你使用相似的技术来命名线程,帮助在日志消息中的线程识别。

当创建连接时,在CachingConnectionFactory配置的Executor将传递给RabbitMQClient,并且它的线程将用于投递新消息到监听器容器.在写作的时候,如果没有配置,client会使用池大小为5的内部线程池executor.

RabbitMQclient使用ThreadFactory来为低端I/O(socket)操作创建线程.要改变这个工厂,你需要配置底层RabbitMQConnectionFactory,正如thesectioncalled“ConfiguringtheUnderlyingClientConnectionFactory”中所描述.

检测空闲异步消费者

虽然高效,但异步消费者存在一个问题:如何来探测它们什么是空闲的-当有一段时间没有收到消息时,用户可能想要采取某些动作.

从1.6版本开始,当没有消息投递时,可配置监听器容器来发布ListenerContainerIdleEvent事件.当容器是空闲的,事件会每隔idleEventInterval毫秒发布事件.

要配置这个功能,须在容器上设置idleEventInterval:

xml

<rabbit:listener-container connection-factory="connectionFactory"...idle-event-interval="60000"... >
<rabbit:listener id="container1" queue-names="foo" ref="myListener" method="handle"/>
</rabbit:listener-container>
Java

@Bean
publicSimpleMessageListenerContainer(ConnectionFactoryconnectionFactory){
SimpleMessageListenerContainercontainer=newSimpleMessageListenerContainer(connectionFactory);
...
container.setIdleEventInterval(60000L);
...
returncontainer;
}
@RabbitListener

@Bean
publicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(){
SimpleRabbitListenerContainerFactoryfactory=newSimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setIdleEventInterval(60000L);
...
returnfactory;
}
在上面这些情况中,当容器空闲时,每隔60秒就会发布事件.

事件消费

通过实现ApplicationListener可捕获这些事件-要么是一个一般的监听器,要么是一个窄化的只接受特定事件的监听器.你也可以使用SpringFramework4.2中引入的@EventListener.

下面的例子在单个类中组合使用了@RabbitListener和@EventListener.重点要理解,应用程序监听器会收到所有容器的事件,因此如果你只对某个容器采取措施,那么你需要检查监听器id.你也可以使用@EventListener条件来达到此目的.

事件有4个属性:

source-监听容器实例

id-监听器id(或容器bean名称)

idleTime-当事件发布时,容器已经空闲的时间

queueNames-容器监听的队列名称

publicclassListener{

@RabbitListener(id="foo",queues="#{queue.name}")
publicStringlisten(Stringfoo){
returnfoo.toUpperCase();
}

@EventListener(condition="event.listenerId=='foo'")
publicvoidonApplicationEvent(ListenerContainerIdleEventevent){
...
}

}
重要
事件监听器会查看所有容器的事件,因此,在上面的例子中,我们根据监听器ID缩小了要接收的事件.
警告
如果你想使用idle事件来停止监听器容器,你不应该在调用监听器的线程上来调用[code=plain]container.stop()方法-它会导致延迟和不必要的日志消息.相反,你应该把事件交给一个不同的线程,然后可以停止容器。[/code]

3.1.7消息转换器

介绍

AmqpTemplate同时也定义了多个发送和接收消息(委派给MessageConverter)的方法.

MessageConverter本身是很简单的.在每个方向上它都提供了一个方法:一个用于转换成Message,另一个用于从Message中转换.注意,当转换成Message时,除了object外,你还需要提供消息属性."object"参数通常对应的是Messagebody.

public interfaceMessageConverter{

MessagetoMessage(Objectobject,MessagePropertiesmessageProperties)
throwsMessageConversionException;

ObjectfromMessage(Messagemessage)throwsMessageConversionException;

}
AmqpTemplate中相关的消息发送方法列举在下边.这比我们前面提到的要简单,因为它们不需要Message实例.相反地,MessageConverter负责创建每个消息(通过将提供的对象转换成Messagebody的字节数组,以及添加提供的MessageProperties).

voidconvertAndSend(Objectmessage)throwsAmqpException;

voidconvertAndSend(StringroutingKey,Objectmessage)throwsAmqpException;

voidconvertAndSend(Stringexchange,StringroutingKey,Objectmessage)
throwsAmqpException;

voidconvertAndSend(Objectmessage,MessagePostProcessormessagePostProcessor)
throwsAmqpException;

voidconvertAndSend(StringroutingKey,Objectmessage,
MessagePostProcessormessagePostProcessor)throwsAmqpException;

voidconvertAndSend(Stringexchange,StringroutingKey,Objectmessage,
MessagePostProcessormessagePostProcessor)throwsAmqpException;
在接收端,这里只有两个方法:一个接受队列名称,另一个依赖于模板设置的队列属性.

ObjectreceiveAndConvert()throwsAmqpException;

ObjectreceiveAndConvert(StringqueueName)throwsAmqpException;
thesectioncalled“AsynchronousConsumer”中提到的MessageListenerAdapter也使用了[code=plain]MessageConverter.[/code]

SimpleMessageConverter

MessageConverter策略的默认实现被称为SimpleMessageConverter.如果你没有明确配置,RabbitTemplate实例会使用此转换器的实例.它能处理基于文本内容,序列化Java对象,以及简单的字节数组.

从Message中转换

如果传入消息的内容类型以"text"(如."text/plain")开头,它同时也会检查内容编码属性,以确定将消息body字节数组转换成字符串所要使用的字符集.如果在输入消息中没有指定内容编码属性,它默认会使用"UTF-8"字符集.如果你需要覆盖默认设置,你可以配置一个SimpleMessageConverter实例,设置其"defaultCharset"属性,再将其注入到RabbitTemplate实例中.

如果传入消息的内容类型属性值为"application/x-java-serialized-object",SimpleMessageConverter将尝试将字节数组反序列化为一个Javaobject.虽然这对于简单的原型是有用的,但一般不推荐依赖于Java序列化机制,因为它会生产者和消费者之间的紧密耦合。当然,这也排除了在两边使用非Java的可能性.由于AMQP是线路级协议,因这样的限制失去了许多优势,这是不幸的.在后面的两个章节中,我们将探讨通过丰富的域对象的内容来替代java序列化.

对于其它内容类型,SimpleMessageConverter会以字节数组形式直接返回消息body内容.

参考thesectioncalled“JavaDeserialization”来了解更多信息.

转换成消息

当从任意Java对象转换成Message时,SimpleMessageConverter同样可以处理字节数组,字符串,以及序列化实例.它会将每一种都转换成字节(在字节数组的情况下,不需要任何转换),并且会相应地设置内容类型属性.如果要转换的对象不匹配这些类型,Messagebody将是null.

SerializerMessageConverter

除了它可以使用其它application/x-java-serialized-object转换的Spring框架Serializer和Deserializer实现来配置外,此转换器类似于SimpleMessageConverter.

参考thesectioncalled“JavaDeserialization”来了解更多信息.

Jackson2JsonMessageConverter

转换成消息

正如前面章节提到的,一般来说依赖于Java序列化机制不是推荐的.另一个常见更灵活且可跨语言平台的选择JSON(JavaScriptObjectNotation).可通过在RabbitTemplate实例上配置转换器来覆盖默认SimpleMessageConverter.Jackson2JsonMessageConverter使用的是com.fasterxml.jackson2.x包.

<beanclass="org.springframework.amqp.rabbit.core.RabbitTemplate">
<propertyname="connectionFactory"ref="rabbitConnectionFactory"/>
<propertyname="messageConverter">
<beanclass="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
<!--ifnecessary,overridetheDefaultClassMapper-->
<propertyname="classMapper"ref="customClassMapper"/>
</bean>
</property>
</bean>
正如上面展示的,Jackson2JsonMessageConverter默认使用的是DefaultClassMapper.类型信息是添加到MessageProperties中的(也会从中获取).如果入站消息在MessageProperties中没有包含类型信息,但你知道预期类型,你可以使用defaultType属性来配置静态类型

<bean id="jsonConverterWithDefaultType" class="o.s.amqp.support.converter.Jackson2JsonMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="foo.PurchaseOrder"/>
</bean>
</property>
</bean>
转换Message

入站消息会根据发送系统头部中添加的类型信息来转换成对象.

在1.6之前的版本中,如果不存在类型信息,转换将失败。从1.6版开始,如果类型信息丢失,转换器将使用Jsckson默认值(通常是一个map)来转换JSON.

此外,从1.6版本开始,当在方法上使用@RabbitListener注解时,推断类型信息会添加到MessageProperties;这允许转换器转换成目标方法的参数类型.这只适用于无注解的参数或使用@Payload注解的单个参数.在分析过程中忽略类型消息的参数。

重要

默认情况下,推断类型信息会覆盖inbound__TypeId__和发送系统创建的相关headers.这允许允许接收系统自动转换成不同的领域对象.这只适用于具体的参数类型(不是抽象的或不是接口)或者来自java.util包中的对象.其它情况下,将使用__TypeId__和相关的头.也可能有你想覆盖默认行为以及总是使用__TypeId__信息的情况.例如,让我们假设你有一个接受Foo参数的@RabbitListener,但消息中包含了Bar(它是的Foo(具体类)的子类).推断类型是不正确的.要处理这种情况,需要设置Jackson2JsonMessageConverter的TypePrecedence属性为TYPE_ID而替换默认的INFERRED.这个属性实际上转换器的DefaultJackson2JavaTypeMapper,但为了方便在转换器上提供了一个setter方法.如果你想注入一个自定义类型mapper,你应该设置属性mapper.

@RabbitListener
publicvoidfoo(Foofoo){...}

@RabbitListener
publicvoidfoo(@PayloadFoofoo,@Header("amqp_consumerQueue")Stringqueue){...}

@RabbitListener
publicvoidfoo(Foofoo,o.s.amqp.core.Messagemessage){...}

@RabbitListener
publicvoidfoo(Foofoo,o.s.messaging.Message<Foo>message){...}

@RabbitListener
publicvoidfoo(Foofoo,Stringbar){...}

@RabbitListener
publicvoidfoo(Foofoo,o.s.messaging.Message<?>message){...}
上面前4种情况下,转换器会尝试转换成Foo类型.第五个例子是无效的,因为我们不能确定使用哪个参数来接收消息负荷.在第六个例子中,Jackson会根据泛型WildcardType来应用.

然而,你也可以创建一个自定义转换器,并使用targetMethod消息属性来决定将JSON转换成哪种类型.

这种类型接口只能在@RabbitListener注解声明在方法级上才可实现.在类级@RabbitListener,转换类型用来选择调用哪个@RabbitHandler方法.基于这个原因,基础设施提供了targetObject消息属性,它可用于自定义转换器来确定类型.

MarshallingMessageConverter

还有一个选择是MarshallingMessageConverter.它会委派到SpringOXM包的Marshaller和Unmarshaller策略接口实现.

你可从here了解更多.在配置方面,最常见的是只提供构造器参数,因为大部分Marshaller的实现都将实现Unmarshaller.

<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
<constructor-arg ref="someImplemenationOfMarshallerAndUnmarshaller"/>
</bean>
</property>
</bean>

ContentTypeDelegatingMessageConverter

这个类是在1.4.2版本中引入的,并可基于MessageProperties的contentType属性允许委派给一个特定的MessageConverter.默认情况下,如果没有contentType属性或值没有匹配配置转换器时,它会委派给SimpleMessageConverter.

<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
<property name="delegates">
<map>
<entry key="application/json" value-ref="jsonMessageConverter"/>
<entry key="application/xml" value-ref="xmlMessageConverter"/>
</map> 
</property>
</bean>

Java反序列化

重要

当从不可信任的来源反序列化Java对象时,存在一个可能的漏洞.如果从不可信来源,使用内容类型application/x-java-serialized-object来接收消息,你可以考虑配置允许哪些包/类能反序列化.这既适用于SimpleMessageConverter,也适用于SerializerMessageConverter,当它被配置为使用一个DefaultDeserializer时-或含蓄地或通过配置方式的。

默认情况下,白名单列表是空的,这意味着所有类都会反序列化.你可以设置模式列表,如foo.*,foo.bar.Baz或*.MySafeClass.模式会按照顺序进行检查,直到找到匹配的模式.如果没有找到匹配,将抛出SecurityException.在这些转换器上,可使用whiteListPatterns属性来设置.

消息属性转换器

MessagePropertiesConverter策略接口用于RabbitClientBasicProperties与SpringAMQPMessageProperties之间转换.默认实现(DefaultMessagePropertiesConverter)通常可满虽大部分需求,但如果有需要,你可以自己实现.当大小不超过1024字节时,默认属性转换器将BasicProperties中的LongString转换成String.更大的LongString将不会进行转换(参考下面的内容.这个限制可通过构造器参数来覆盖.

从1.6版本开始,现在headers长超过longstring限制(默认为1024)将被DefaultMessagePropertiesConverter保留作为LongString.你可以通过thegetBytes[],toString(),或getStream()方法来访问内容.

此前,DefaultMessagePropertiesConverter会将这样的头转换成一个DataInputStream(实际上它只是引用了LongString的DataInputStream).在输出时,这个头不会进行转换(除字符串外,如在流上调用toString()方法java.io.DataInputStream@1d057a39).

更大输入LongString头现在可正确地转换,在输出时也一样.

它提供了一个新的构造器来配置转换器,这样可像以前一样来工作:

/**
*ConstructaninstancewhereLongStringswillbereturned
*unconvertedorasajava.io.DataInputStreamwhenlongerthanthislimit.
*Usethisconstructorwith'true'torestorepre-1.6behavior.
*@paramlongStringLimitthelimit.
*@paramconvertLongLongStringsLongStringwhenfalse,
*DataInputStreamwhentrue.
*@since1.6
*/

publicDefaultMessagePropertiesConverter(intlongStringLimit,booleanconvertLongLongStrings){...}
另外,从1.6版本开始,在MessageProperties中添加了一个新属性correlationIdString.此前,当在RabbitMQ客户端中转换BasicProperties时,将会执行不必要的byte[]<->String转换,这是因为MessageProperties.correlationId是一个byte[]而BasicProperties使用的是String.

(最终,RabbitMQ客户端使用UTF-8字符串转化为字节并放在协议消息中).

为提供最大向后兼容性,新属性correlationIdPolicy已经被加入到了DefaultMessagePropertiesConverter.它接受DefaultMessagePropertiesConverter.CorrelationIdPolicy枚举参数.

默认情况下,它设置为BYTES(复制先前的行为).

对于入站消息:

STRING-只映射correlationIdString属性

BYTES-只映射correlationId属性

BOTH-会同时映射两个属性

对于出站消息:

STRING-只映射correlationIdString属性

BYTES-只映射correlationId属性

BOTH-两种属性都会考虑,但会优先考虑String属性

也从1.6版本开始,入站deliveryMode属性不再需要映射MessageProperties.deliveryMode,相反使用MessageProperties.receivedDeliveryMode来代替.另外,入站userId属性也不需要再映射MessageProperties.userId,相反使用MessageProperties.receivedUserId来映射.

这种变化是为了避免这些属性的意外传播,如果同样的MessageProperties对象用于出站消息时.

3.1.8修改消息-压缩以及更多

提供了许多的扩展点,通过它们你可以对消息执行预处理,要么在发送RabbitMQ之前,要么在接收到消息之后.

正如你在Section3.1.7,“MessageConverters”看到的,这样的扩展点存在于AmqpTemplateconvertAndReceive操作中,在那里你可以提供一个MessagePostProcessor.

例如,你的POJO转换之后,MessagePostProcessor允许你在Message上设置自定义的头或属性.

从1.4.2版本开始,额外的扩展点已经添加到RabbitTemplate-setBeforePublishPostProcessors()和setAfterReceivePostProcessors().第一个开启了一个postprocessor来在发送消息到RabbitMQ之前立即运行.当使用批量时(参考thesectioncalled“Batching”),这会在批处理装配之后发送之前调用.

第二个会在收到消息后立即调用.

这些扩展点对于压缩这此功能是有用的,基于这些目的,提供了多个MessagePostProcessor:

GZipPostProcessor

ZipPostProcessor

针对于发送前的消息压缩,以及

GUnzipPostProcessor

UnzipPostProcessor

针对于消息解压.

类似地,SimpleMessageListenerContainer也有一个setAfterReceivePostProcessors()方法,

允许在消息收到由容器来执行解压缩.

-第五部分

3.1.9Request/Reply消息

介绍

AmqpTemplate也提供了各种各样的sendAndReceive方法,它们接受同样的参数选项(exchange,routingKey,andMessage)来执行单向发送操作.
这些方法对于request/reply场景也是有用的,因为它们在发送前处理了必要的"reply-to"属性配置,并能通过它在专用队列(基于回复功能临时创建的队列)上监听回复消息.

类似的request/reply方法也是可用的,MessageConverter可应用于请求和回复上.这些方法被称为convertSendAndReceive.参考AmqpTemplate的JavaDoc来了解详情.

从1.5.0版本开始,每个sendAndReceive方法变种都有一个接受CorrelationData的重载版本.连同正确配置的连接工厂,这使得发布者可以确认发送方的操作.
参考thesectioncalled“PublisherConfirmsandReturns”来了解详情.

Reply超时

默认情况下,这些send和receive方法会在5秒后超时并返回null.这可以通过设置replyTimeout属性来修改.
从1.5版本开始,如果你设置了mandatory属性为true(或特定消息上的mandatory-expression评估为true),如果消息不能投递到队列中,将抛出AmqpMessageReturnedException.
这个exception有returnedMessage,replyCode,replyText属性,如同用于发送的exchangeroutingKey.

这个功能使用了发布者返回特性,可通过在CachingConnectionFactory上设置publisherReturns为true来启用(参考thesectioncalled“PublisherConfirmsandReturns”).
此外,你不必在RabbitTemplate上注册你自己的ReturnCallback.

RabbitMQDirectreply-to

重要

从3.4.0版本开始,RabbitMQ服务器现在支持Directreply-to,基于主要原因,它消除了固定回复队列(为了避免为每个请求创建临时队列).
SpringAMQP1.4.1版本开始,Directreply-to就已经做为了默认使用(如果服务器支持的话),而不再创建临时队列.
当没有提供replyQueue(或设置名称为amq.rabbitmq.reply-to),RabbitTemplate会自动探测是否支持Directreply-to,要么使用它或使用临时回复队列来回退.当使用Directreply-to,reply-listener不是必需的,不应该被配置。

Reply监听器仍然运行命名队列(不是amq.rabbitmq.reply-to),允许控制并发回复.

从.16版本开始,出于某些原因,你想为每个回复使用临时的,专用的,自动删除的队列,你可以设置useTemporaryReplyQueues属性为true.如果你设置了replyAddress,此属性会被忽略.

决定是否使用directreply-to,可以通过继承RabbitTemplate并覆盖useDirectReplyTo()来修改.

此方法只在发出第一个请求时,调用一次.

应答队列的消息相关性

当使用固定回复队列时(不是amq.rabbitmq.reply-to),必须要提供correlationdata,这样回复才能关联请求.参考RabbitMQRemoteProcedureCall(RPC).
默认情况下,标准correlationId属性会用来持有correlationdata.然而,如果你想使用自定义属性来持有correlationdata,你可在<rabbit-template/>中设置correlation-key属性.
显示设置属性为correlationId将与缺省属性相同.当然,客户端和服务器对于correlationdata必须要有相同的头.

SpringAMQP1.1版本为这个data使用自定义属性spring_reply_correlation.如果你想在当前版本中恢复这种行为,也许是为了保持1.1中的另一个应用程序的兼容性,你必须设置属性以spring_reply_correlation。

回复监听器容器

当使用3.4.0之前的Rabbit版本,每个回复都会使用一个新临时队列.然而,可在模板上配置单个回复队列,这将更加高效,同时也允许你在队列上设置参数.然而,在这种情况下,你必须提供<reply-listener/>子元素.

这个元素为回复队列提供了监听器容器,以模板为监听器.
所有Section3.1.15,“MessageListenerContainerConfiguration”中的属性都可以配置在<listener-container/>元素中,除了connection-factory和message-converter(它们是模块配置中继承下来的).

重要

如果运行了多个应用程序实例或者使用了多个RabbitTemplate,那么你必须为每个都使用唯一的回复队列-RabbitMQ没有在队列中选择消息的能力,如果它们都使用相同队列,每个实例都将竞争的答复,而不一定是收到他们自己的。

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
reply-queue="replies" reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
由于容器和模板可共享一个连接工厂,它们不会共享一个通道,因此请求和回复不是在同一个事务中执行的(如果是事务的).

重要

在1.5.0版本之前,reply-address属性不可用,回复总是通过默认交换器和reply-queue作路由键来进行的.现在这依然是默认的,但现在你可以指定新的reply-address属性.
reply-address可以包含<exchange>/<routingKey>形式的地址,回复将会路由到设定的exchange和路由到routingkey绑定的队列上.
reply-address优先于reply-queue.<reply-listener>必须配置为一个单独的<listener-container>组件,当只使用reply-address时,无论是reply-address还是reply-queue(在<listener-container>中的queue属性)必须指的是同一个队列.

在这个配置中,SimpleListenerContainer用于接收回复;而RabbitTemplate将成为MessageListener.当使用<rabbit:template/>命名空间元素定义模板时,正如上面所展示的,分析器会定义容器,并将模板作为监听器进行包装.

重要

当模板不使用固定replyQueue(或使用Directreply-to-参考thesectioncalled“RabbitMQDirectreply-to”),则不需要监听器容器.当在RabbitMQ3.4.0+使用时,Directreply-to是更好的机制.

如果你将RabbitTemplate定义为<bean/>,或使用@Configuration类将其定义为@Bean,或者通过编程来创建模板,你需要自己定义和包装回复监听器容器.
如果这样做失败了,模板将不会收到回复,并最终会超时并返回null作为对sendAndReceive方法调用者的回复.

从1.5版本开始,RabbitTemplate会探测是否配置了MessageListener来接收回复.如果没有,它会尝试发送并使用回复地地址来接收消息,如果失败了,则会抛出IllegalStateException(因为不会收到回复).

此外,如果使用了简单的replyAddress(队列名称),回复监听器容器会验证与监听的队列是否是一样的名称.但如果这个地址是交换器和路由键,这种检查不会被执行,会输出调试日志信息.

重要

当在编写回复监听器和模板时,重要的一点是要保证模板的replyQueue与容器的queues(或queueNames)属性指的是相同的队列.模板会将回复队列插入到出站消息的replyTo属性中.

下面的例子展示了如何来包装这些beans.

<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory"/>
<property name="exchange" value="foo.exchange"/>
<property name="routingKey" value="foo"/>
<property name="replyQueue" ref="replyQ"/>
<property name="replyTimeout" value="600000"/>
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory"/>
<property name="queues" ref="replyQ"/>
<property name="messageListener" ref="amqpTemplate"/>
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue"/>
@Bean
publicRabbitTemplateamqpTemplate(){
RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyQueue(replyQueue());
rabbitTemplate.setReplyTimeout(60000);
returnrabbitTemplate;}
@Bean
publicSimpleMessageListenerContainerreplyListenerContainer(){
SimpleMessageListenerContainercontainer=newSimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
returncontainer;
}
@Bean
publicQueuereplyQueue(){
returnnewQueue("my.reply.queue");
}
完整的RabbitTemplate包装固定回复队列,与远程监听器容器的请求回复处理展示在thistestcase.

重要

当回复超时时(replyTimeout),sendAndReceive()方法会返回null.

在1.3.6版本之前,消息超时回复只是简单地记录下来.现在,如果收到了迟到回复,将会拒绝(模板会抛出AmqpRejectAndDontRequeueException).
如果回复队列配置了将拒绝消息到死信交换器中,可获取回复来作后面的分析.只须将队列以队列名称作为路由键绑定到死信交换器中.

参考RabbitMQDeadLetterDocumentation来了解更多关于死信的配置信息.

你也可以看示例中关于FixedReplyQueueDeadLetterTests测试用例.

AsyncRabbitTemplate

1.6版本引入了AsyncRabbitTemplate.

它有与AmqpTemplate上类似的sendAndReceive(和convertSendAndReceive)方法,但不是阻塞的,它们会返回一个ListenableFuture.

sendAndReceive方法返回一个RabbitMessageFuture;convertSendAndReceive方法会返回一个RabbitConverterFuture.

你可以同步稍后在future上调用get()方法来获取结果,也可以注册一个回调异步来获取结果.

@Autowired
privateAsyncRabbitTemplatetemplate;
...
public voiddoSomeWorkAndGetResultLater(){
...
ListenableFuture<String>future=this.template.convertSendAndReceive("foo");
//dosomemorework
Stringreply=null;
try{
reply=future.get();
}
catch(ExecutionExceptione){
...
}...
}
public voiddoSomeWorkAndGetResultAsync(){
...
RabbitConverterFuture<String>future=this.template.convertSendAndReceive("foo");
future.addCallback(newListenableFutureCallback<String>(){
@Override
publicvoidonSuccess(Stringresult){
...
}
@Override
publicvoidonFailure(Throwableex){
...
}
});
...
}
如果设置了mandatory,且消息不能投递,future会抛出一个ExecutionException,并带有AmqpMessageReturnedException原因,它封装了返回的消息和以及关于返回的信息.

如果设置了enableConfirms,future会包含一个属性confirm,它是ListenableFuture<Boolean>,true表示成功的发布.

如果confirmfuture是false,RabbitFuture会有一个属性nackCause-如果可用的话,则代表的是失败的原因.

重要

发布者确认已被废弃了(如果在回复之后收到),-因为回复已经暗示了成功发布.

在模板上设置receiveTimeout属性来表示回复超时时间(它默认为30秒).如果发生了超时,future会以AmqpReplyTimeoutException结束.

模板可实现SmartLifecycle;这样可阻止模板在等待回复时Future退出.

Spring远程调用AMQP

SpringFramework有一个普遍的远程处理能力,允许RemoteProcedureCalls(RPC)使用多种传输协议.Spring-AMQP通过在客户端使用AmqpProxyFactoryBean,在服务端使用AmqpInvokerServiceExporter也可以提供类似的机制.
它提供了基于AMQP的RPC.在客户端,RabbitTemplate可以按照上面一样来使用,在服务器端,invoker(配置为MessageListener)会收到消息,调用配置的服务,使用入站消息的replyTo信息来返回回复.

客户端工厂可注入任何bean(使用它的serviceInterface);客户端然后可以调用代理上的方法,导致在AMQP上远程执行.

重要

使用默认MessageConverter器,方法参数和返回值必须是Serializable的实例.

在服务器端,AmqpInvokerServiceExporter包含AmqpTemplate和MessageConverter属性.

目前,未使用模板的MessageConverter.如果你需要提供定制的消息转换器,那么你需要使用messageConverter属性进行提供.在客户端,可在AmqpTemplate中添加定制消息转换器,它是使用其amqpTemplate属性提供给AmqpProxyFactoryBean的.

样例client和server配置如下所示.

<bean id="client" class="org.springframework.amqp.remoting.client.AmqpProxyFactoryBean">
<propertyname="amqpTemplate "ref="template"/>
<propertyname="serviceInterface"value="foo.ServiceInterface"/>
</bean>
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:template id="template" connection-factory="connectionFactory" reply-timeout="2000"routing-key="remoting.binding" exchange="remoting.exchange"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="remoting.queue"/>
<rabbit:direct-exchange name="remoting.exchange">
<rabbit:bindings>
<rabbit:binding queue="remoting.queue" key="remoting.binding"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<bean id="listener" class="org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter">
<property name="serviceInterface" value="foo.ServiceInterface"/>
<property name="service" ref="service"/>
<property name="amqpTemplate" ref="template"/>
</bean><bean id="service" class="foo.ServiceImpl"/>
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:template id="template" connection-factory="connectionFactory"/>
<rabbit:queue name="remoting.queue"/>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="listener" queue-names="remoting.queue"/>
</rabbit:listener-container>
重要
AmqpInvokerServiceExporter只能处理适当格式的消息,如果从AmqpProxyFactoryBean中发出的消息.如果它接收到一个不能解释的消息,那么将发送一个序列化的RuntimeException作为回复.
如果这些消息无replyToAddress属性,消息会被拒绝且在没有配置死信交换器时会永久丢失.[/code]
默认情况下,如果请求消息不能投递,调用线程最终会超时,并会抛出[code=plain]RemoteProxyFailureException.超时时间是5秒,可在RabbitTemplate通过设置replyTimeout属性来修改.
从1.5版本开始,如果设置mandatory属性为true,并在连接工厂中启用了返回(参考thesectioncalled“PublisherConfirmsandReturns”),调用线程会抛出一个AmqpMessageReturnedException.
参考thesectioncalled“ReplyTimeout”来了解更多信息.[/code]

-第六部分

3.1.10配置broker

介绍

AMQP规范描述了协议是如何用于broker中队列,交换器以及绑定上的.这些操作是从0.8规范中移植的,更高的存在于org.springframework.amqp.core包中的AmqpAdmin接口中.
那个接口的RabbitMQ实现是RabbitAdmin,它位于org.springframework.amqp.rabbit.core包.

AmqpAdmin接口是基于SpringAMQP域抽象,展示如下:

public interfaceAmqpAdmin{
//ExchangeOperations
voiddeclareExchange(Exchangeexchange);
voiddeleteExchange(StringexchangeName);
//QueueOperations
QueuedeclareQueue();
StringdeclareQueue(Queuequeue);
voiddeleteQueue(StringqueueName);
voiddeleteQueue(StringqueueName,booleanunused,booleanempty);
voidpurgeQueue(StringqueueName,booleannoWait);
//BindingOperations
voiddeclareBinding(Bindingbinding);
voidremoveBinding(Bindingbinding);
PropertiesgetQueueProperties(StringqueueName);
}
getQueueProperties()方法会返回关于队列的的一些有限信息(消息个数和消费者数目).属性返回的keys像RabbitTemplate(QUEUE_NAME,QUEUE_MESSAGE_COUNT,QUEUE_CONSUMER_COUNT)中的常量一样是可用的.
RabbitMQRESTAPI提供了更多关于QueueInfo对象的信息.

无参declareQueue()方法在broker上定义了一个队列,其名称是自动生成的.自动生成队列的其它属性是exclusive=true,autoDelete=true,anddurable=false.

declareQueue(Queuequeue)方法接受一个Queue对象,并且返回声明队列的名称.如果提供的队列名称是空字符串,broker使用生成的名称来声明队列再将名称返回给调用者.Queue对象本身是不会变化的.

这种功能只能用于编程下直接调用RabbitAdmin.它不支持在应用程序上下文中由admin来定义队列的自动声明.

与此形成鲜明对比的是,AnonymousQueue,框架会为其生成唯一名称(UUID),durable为false,exclusive,autoDelete为true的匿名队列.<rabbit:queue/>带空的或缺失的name属性总会创建 一个AnonymousQueue.

参考thesectioncalled“AnonymousQueue”来理解为什么AnonymousQueue会优先选择broker生成队列名称,以及如何来控制名称格式.声明队列必须有固定的名称,因为它们可能会上下文的其它地方引用,例如,在监听器中:

<rabbit:listener-container>
<rabbit:listener ref="listener" queue-names="#{someQueue.name}"/>
</rabbit:listener-container>
参考thesectioncalled“AutomaticDeclarationofExchanges,QueuesandBindings”.

此接口的RabbitMQ实现是RabbitAdmin,当用SpringXML配置时,看起来像下面这样:

<rabbit:connection-factory id="connectionFactory"/>
<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>
CachingConnectionFactory缓存模式是CHANNEL时(默认的),RabbitAdmin实现会在同一个ApplicationContext中自动延迟声明Queues,Exchanges和Bindings.
只要Connection打开了与Broker的连接,这些组件就会被声明.有一些命名空间特性可以使这些变得便利,如,在Stocks样例程序中有:

<rabbit:queue id="tradeQueue"/>
<rabbit:queue id="marketDataQueue"/>
<fanout-exchange name="broadcast.responses" xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="tradeQueue"/>
</bindings>
</fanout-exchange>
<topic-exchange name="app.stock.marketdata" xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="marketDataQueue"pattern="${stocks.quote.pattern}"/>
</bindings>
</topic-exchange>
在上面的例子中,我们使用匿名队列(实际上由框架内部生成,而非由broker生成的队列),并用ID进行了指定.我们也可以使用明确的名称来声明队列,也作为上下文中bean定义的标识符.如.

<rabbit:queue name="stocks.trade.queue"/>
重要
你可以提供idname属性.这允许你独立于队列名称通过id来指定队列.它允许使用标准的Spring属性,如属性占位符和队列名称的SpEL表达式;当使用名称来作为标识符,这些特性是不可用的.
队列也可以使用其它的参数进行配置,例如x-message-ttl或x-ha-policy.通过命名空间支持,它们可以通过<rabbit:queue-arguments>元素以参数名/参数值的MAP形式来提供.

<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
默认情况下,参数假设为字符串.对于其它类型的参数,需要提供类型.

<rabbit:queue name="withArguments">
<rabbit:queue-arguments value-type="java.lang.Long">
<entry key="x-message-ttl" value="100"/>
</rabbit:queue-arguments>
</rabbit:queue>
当提供混合类型的参数时,可为每个entry元素提供type:

<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">100</value>
</entry>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
在SpringFramework3.2或以后,声明起来更加简洁:

<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
重要
RabbitMQbroker不允许使用不匹配的参数来声明队列.例如,如果一个无timetolive参数的队列已经存在,然后你试图使用[code=plain]key="x-message-ttl"value="100"进行声明,那么会抛出一个异常.[/code]
默认情况下,当出现异常时,RabbitAdmin会立即停止所有声明的处理过程;这可能会导致下游问题-如监听器容器会初始化失败,因另外的队列没有声明.

这种行为可以通过在RabbitAdmin上设置ignore-declaration-exceptions为true来修改.此选项会指示RabbitAdmin记录异常,并继续声明其它元素.当使用Java来配置RabbitAdmin时,此属性为ignoreDeclarationExceptions.
这是一个全局设置,它将应用到所有元素上,如应用到queues,exchanges和bindings这些具有相似属性的元素上.

在1.6版本之前,此属性只会在channel上发生IOExcepton时才会起作用-如当目前和期望属性发生错配时.现在,这个属性可在任何异常上起作用,包括TimeoutException等等.

此外,任何声明异常都会导致发布DeclarationExceptionEvent,这是一个ApplicationEvent,在上下文中可通过任何ApplicationListener消费.此事件包含了admin的引用,正在声明的元素以及Throwable.

从1.3版本开始,HeadersExchange可配置匹配多个headers;你也可以指定是否需要必须匹配任何一个或全部headers:

<rabbit:headers-exchange name="headers-test">
<rabbit:bindings>
<rabbit:binding queue="bucket">
<rabbit:binding-arguments>
<entrykey="foo"value="bar"/>
<entrykey="baz"value="qux"/>
<entrykey="x-match"value="all"/>
</rabbit:binding-arguments>
</rabbit:binding>
</rabbit:bindings>
</rabbit:headers-exchange>
从1.6版本开始,Exchanges可使用internal标志来配置(默认为false),当然,这样的Exchange也可以通过RabbitAdmin来配置(如果在应用程序上下文中存在).
如果对于交换器来说,internal标志为true,RabbitMQ会允许客户端来使用交换器.这对于死信交换器来说或交换器到交换器绑定来说,是很用的,因为在这些地方你不想让发布者直接使用交换器.

要看如何使用Java来配置AMQP基础设施,可查看Stock样例程序,在那里有一个带@Configuration注解的抽象AbstractStockRabbitConfiguration类,它依次有RabbitClientConfiguration和RabbitServerConfiguration子类.AbstractStockRabbitConfiguration的代码展示如下:

@Configuration
public abstract classAbstractStockAppRabbitConfiguration{

@Bean
 publicConnectionFactoryconnectionFactory(){
CachingConnectionFactoryconnectionFactory=
newCachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
returnconnectionFactory;
}

@Bean
 publicRabbitTemplaterabbitTemplate(){
RabbitTemplatetemplate=newRabbitTemplate(connectionFactory());
template.setMessageConverter(jsonMessageConverter());
configureRabbitTemplate(template);
returntemplate;
}

@Bean
 publicMessageConverterjsonMessageConverter(){
returnnewJsonMessageConverter();
}

@Bean
 publicTopicExchangemarketDataExchange(){
returnnewTopicExchange("app.stock.marketdata");
}

//additionalcodeomittedforbrevity

}
在Stock程序中,服务器使用下面的@Configuration注解来配置:

@Configuration
public classRabbitServerConfigurationextendsAbstractStockAppRabbitConfiguration{

@Bean
 publicQueuestockRequestQueue(){
returnnewQueue("app.stock.request");
}
}
这是整个@Configuration类继承链结束的地方.最终结果是TopicExchange和队列会在应用程序启动时被声明.在服务器配置中,没有TopicExchange与队列的绑定,因为这是在客户端程序完成的.
然后stock请求队列是自动绑定到AMQP默认交换器上的-这种行为是由规范来定义的.

客户端@Configuration类令人关注的地方展示如下.

@Configuration
public classRabbitClientConfigurationextendsAbstractStockAppRabbitConfiguration{

@Value("${stocks.quote.pattern}")
 privateStringmarketDataRoutingKey;

@Bean
 publicQueuemarketDataQueue(){
returnamqpAdmin().declareQueue();
}

/**
*Bindstothemarketdataexchange.
*Interestedinanystockquotes
*thatmatchitsroutingkey.
*/
@Bean
  publicBindingmarketDataBinding(){
returnBindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}

//additionalcodeomittedforbrevity

}
客户端使用AmqpAdmin的declareQueue()方法声明了另一个队列,并将其绑定到了marketdata交换器上(路由键模式是通常外部properties文件来定义的).

Queues和Exchanges的BuilderAPI

当使用Java配置时,1.6版本引入了一个便利的API来配置Queue和Exchange对象:

@Bean
publicQueuequeue(){
returnQueueBuilder.nonDurable("foo")
.autoDelete()
.exclusive()
.withArgument("foo","bar")
.build();
}

@Bean
publicExchangeexchange(){
returnExchangeBuilder.directExchange("foo")
.autoDelete()
.internal()
.withArgument("foo","bar")
.build();
}
查看org.springframework.amqp.core.QueueBuilder

org.springframework.amqp.core.ExchangeBuilder的JavaDoc来了解更多信息.

DeclaringCollectionsofExchanges,Queues,Bindings

从1.5版本开始,可以在一个@Bean声明多个条目来返回集合.

只有集合中的第一个元素可认为是Declarablea的,并且只有集合中的Declarable元素会被处理.(

OnlycollectionswherethefirstelementisaDeclarableareconsidered,andonlyDeclarableelementsfromsuchcollectionsareprocessed.)

@Configuration
public static classConfig{

@Bean
publicConnectionFactorycf(){
returnnewCachingConnectionFactory("localhost");
}

@Bean
publicRabbitAdminadmin(ConnectionFactorycf){
returnnewRabbitAdmin(cf);
}

@Bean
publicDirectExchangee1(){
returnnewDirectExchange("e1",false,true);
}

@Bean
publicQueueq1(){
returnnewQueue("q1",false,false,true);
}

@Bean
publicBindingb1(){
returnBindingBuilder.bind(q1()).to(e1()).with("k1");
}

@Bean
publicList<Exchange>es(){
returnArrays.<Exchange>asList(
newDirectExchange("e2",false,true),
newDirectExchange("e3",false,true)
);
}

@Bean
publicList<Queue>qs(){
returnArrays.asList(
newQueue("q2",false,false,true),
newQueue("q3",false,false,true)
);
}

@Bean
publicList<Binding>bs(){
returnArrays.asList(
newBinding("q2",DestinationType.QUEUE,"e2","k2",null),
newBinding("q3",DestinationType.QUEUE,"e3","k3",null)
);
}

@Bean
publicList<Declarable>ds(){
returnArrays.<Declarable>asList(
newDirectExchange("e4",false,true),
newQueue("q4",false,false,true),
newBinding("q4",DestinationType.QUEUE,"e4","k4",null)
);
}

}

条件声明

默认情况下,所有queues,exchanges,和bindings都可通过应用程序上下文中所有RabbitAdmin实例来声明(设置了auto-startup="true").

重要

从1.2版本开始,可以有条件地声明元素.当程序连接了多个brokers,并需要在哪些brokers上声明特定元素时,特别有用.

代表这些元素要实现Declarable接口,此接口有两个方法:shouldDeclare()和getDeclaringAdmins().RabbitAdmin使用这些方法来确定某个特定实例是否应该在其Connection上处理声明.

这些属性作为命名空间的属性是可用的,如下面的例子所示.

<rabbit:admin id="admin1" connection-factory="CF1"/>
<rabbit:admin id="admin2" connection-factory="CF2"/>
<rabbit:queue id="declaredByBothAdminsImplicitly"/>
<rabbit:queue id="declaredByBothAdmins" declared-by="admin1,admin2"/>
<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1"/>
<rabbit:queue id="notDeclaredByAny" auto-declare="false"/>
<rabbit:direct-exchange name="direct" declared-by="admin1,admin2">
<rabbit:bindings>
<rabbit:bindingkey="foo" queue="bar"/>
</rabbit:bindings>
</rabbit:direct-exchange>
重要
默认情况下,如果没有提供declared-by(或是空的),[code=plain]auto-declare属性则为true,那么所有RabbitAdmin将声明对象(只要admin的auto-startup属性为true,默认值).[/code]
现样的,你可以使用基于Java的@Configuration注解来达到同样的效果.在这个例子中,组件会由admin1来声明,而不是admin2:

@Bean
publicRabbitAdminadmin(){
RabbitAdminrabbitAdmin=newRabbitAdmin(cf1());
rabbitAdmin.afterPropertiesSet();
returnrabbitAdmin;
}

@Bean
publicRabbitAdminadmin2(){
RabbitAdminrabbitAdmin=newRabbitAdmin(cf2());
rabbitAdmin.afterPropertiesSet();
returnrabbitAdmin;
}

@Bean
publicQueuequeue(){
Queuequeue=newQueue("foo");
queue.setAdminsThatShouldDeclare(admin());
returnqueue;
}

@Bean
publicExchangeexchange(){
DirectExchangeexchange=newDirectExchange("bar");
exchange.setAdminsThatShouldDeclare(admin());
returnexchange;
}

@Bean
publicBindingbinding(){
Bindingbinding=newBinding("foo",DestinationType.QUEUE,exchange().getName(),"foo",null);
binding.setAdminsThatShouldDeclare(admin());
returnbinding;
}

AnonymousQueue

一般来说,当需要一个独特命名,专用的,自动删除队列时,建议使用AnonymousQueue来代替中间件定义的队列名称(使用""作为队列名称会导致中间件生成队列名称).

这是因为:

队列实际上是在与broker的连接建立时声明的;这在bean创建和包装之后要很长时间;使用这个队列的beans需要知道其名称.而事实上,当app启动时,broker甚至还没有运行.

如果与broker的连接因某种原因丢失了,admin会使用相同的名称会重新声明AnonymousQueue.如果我们使用broker-声明队列,队列名称可能会改变.

从1.5.3版本开始,你可通过AnonymousQueue来控制队列名称的格式.

默认情况下,队列名称是UUID的字符串表示;例如:07afcfe9-fe77-4983-8645-0061ec61a47a.

现在,你可以提供一个AnonymousQueue.NamingStrategy实现作为其构造器参数:

@Bean
publicQueueanon1(){
return newAnonymousQueue(newAnonymousQueue.Base64UrlNamingStrategy());
}

@Bean
publicQueueanon2(){
return newAnonymousQueue(newAnonymousQueue.Base64UrlNamingStrategy("foo-"));
}
第一个会生成队列名称前辍spring.gen-其后为UUIDbase64的表示,例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g.第二个会生成队列名称前辍为foo-其后为UUID的base64表示.

base64编码使用RFC4648的"URLandFilenameSafeAlphabet";删除了字符(=).

你可以提供你自己的命名策略,可以包括队列名称中的其他信息(例如应用程序、客户端主机)。

从1.6版本开始,当使用XML配置时,可以指定命名策略;naming-strategy属性出现在<rabbit:queue>元素的属性中,对于bean引用来说,它们实现了AnonymousQueue.NamingStrategy.

<rabbit:queue id="uuidAnon"/>
<rabbit:queue id="springAnon" naming-strategy="springNamer"/>
<rabbit:queue id="customAnon" naming-strategy="customNamer"/>
<bean id="springNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy"/>
<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
<constructor-arg value="custom.gen-"/>
</bean>
第一个创建了UUID字符串表示的名称.第二个创建了类似s[code=plain]pring.gen-MRBv9sqISkuCiPfOYfpo4g的名称.第三个创建了类似custom.gen-MRBv9sqISkuCiPfOYfpo4g的名称.[/code]
当然,你可以提供你自己的命名策略bean.

3.1.11延迟的消息交换器

1.6版本引入了DelayedMessageExchangePlugin支持.

该插件目前被标记为实验性质,但可用已超过一年(在写作的时间)。如果插件的变化是必要的,我们将尽快添加支持这样的变化。因此,这种在SpringAMQP支持同样也应考虑为实验性质.这个功能在RabbitMQ3.6.0版本和0.0.1插件版本中经过测试。

要使用RabbitAdmin来声明一个延迟交换器,只需要在交换器上简单地设置delayed属性为true.RabbitAdmin会使用交换器类型(Direct,Fanout等)来设置x-delayed-type参数,并使用x-delayed-message来声明交换器.

当使用XML来配置交换器beans时,delayed属性(默认为false)是可用的.

<rabbit:topic-exchange name="topic" delayed="true"/>
要发送延迟消息,只需要通过MessageProperties设置x-delayheader:

MessagePropertiesproperties=newMessageProperties();
properties.setXDelay(15000);
template.send(exchange,routingKey,
MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build());


rabbitTemplate.convertAndSend(exchange,routingKey,"foo",newMessagePostProcessor(){

@Override
publicMessagepostProcessMessage(Messagemessage)throwsAmqpException{
message.getMessageProperties().setXDelay(15000);
returnmessage;
}

});
要检查消息是否是延迟的,可调用MessageProperties的getReceivedDelay().它是一个独立的属性,以避免从一个输入消息意外的传播到一个输出消息。

3.1.12RabbitMQRESTAPI

当启用了管理插件时,RabbitMQ服务器公开了RESTAPI来监控和配置broker.

现在提供了JavaBindingfortheAPI.一般来说,你可以直接使用API,但提供了便利的包装器来使用熟悉的SpringAMQPQueue,Exchange,和Binding域对象.
当直接使用com.rabbitmq.http.client.ClientAPI(分别使用QueueInfo,ExchangeInfo,和BindingInfo),那些对象的更多信息将可用.下面是RabbitManagementTemplate上的可用操作:

public interfaceAmqpManagementOperations{

voidaddExchange(Exchangeexchange);

voidaddExchange(Stringvhost,Exchangeexchange);

voidpurgeQueue(Queuequeue);

voidpurgeQueue(Stringvhost,Queuequeue);

voiddeleteQueue(Queuequeue);

voiddeleteQueue(Stringvhost,Queuequeue);

QueuegetQueue(Stringname);

QueuegetQueue(Stringvhost,Stringname);

List<Queue>getQueues();

List<Queue>getQueues(Stringvhost);

voidaddQueue(Queuequeue);

voidaddQueue(Stringvhost,Queuequeue);

voiddeleteExchange(Exchangeexchange);

voiddeleteExchange(Stringvhost,Exchangeexchange);

ExchangegetExchange(Stringname);

ExchangegetExchange(Stringvhost,Stringname);

List<Exchange>getExchanges();

List<Exchange>getExchanges(Stringvhost);

List<Binding>getBindings();

List<Binding>getBindings(Stringvhost);

List<Binding>getBindingsForExchange(Stringvhost,Stringexchange);

}
参考javadocs来了解更多信息.

3.1.13异常处理

RabbitMQJavaclient的许多操作会抛出受查异常.例如,有许多可能抛出IOExceptions的地方.RabbitTemplate,SimpleMessageListenerContainer,和其它SpringAMQP组件会捕获这些异常,并将它们转换为运行时层次的异常.
这些是定义在org.springframework.amqp包中的,且AmqpException是层次结构的基础.

当监听器抛出异常时,它会包装在一个ListenerExecutionFailedException中,正常情况下消息会被拒绝并由broker重新排队.将defaultRequeueRejected设置为false可导致消息丢弃(或路由到死信交换器中).

正如thesectioncalled“MessageListenersandtheAsynchronousCase”讨论的,监听器可抛出AmqpRejectAndDontRequeueException来有条件地控制这种行为。

然而,有一种类型的错误,监听器无法控制其行为.当遇到消息不能转换时(例如,无效的content_encoding头),那么消息在到达用户代码前会抛出一些异常.当设置defaultRequeueRejected为true(默认),这样的消息可能会一遍又一遍地重新投递.
在1.3.2版本之前,用户需要编写定制ErrorHandler,正如Section3.1.13,“ExceptionHandling”描述的内容来避免这种情况.

从1.3.2版本开始,默认的ErrorHandler是ConditionalRejectingErrorHandler,它将拒绝那些失败且不可恢复的消息(不会重新排队):

o.s.amqp...MessageConversionException

o.s.messaging...MessageConversionException

o.s.messaging...MethodArgumentNotValidException

o.s.messaging...MethodArgumentTypeMismatchException

第一个是在使用MessageConverter转换传入消息负荷时抛出的.
第二个是当映射到@RabbitListener方法时,转换服务需要其它转换抛出的.
第三个是在监听器上使用了验证(如.@Valid),且验证失败时抛出的.
第四个是对于目标方法传入消息类型转换失败抛出的.例如,参数声明为Message<Foo>,但收到的是Message<Bar>.

错误处理器的实例可使用FatalExceptionStrategy来配置,因为用户可以提供它们的规则来有条件的拒绝消息,如.来自SpringRetry(thesectioncalled“MessageListenersandtheAsynchronousCase”)中的BinaryExceptionClassifier代理实现.
此外,ListenerExecutionFailedException现在有一个可用于决策的failedMessage属性.如果FatalExceptionStrategy.isFatal()方法返回true,错误处理器会抛出AmqpRejectAndDontRequeueException.
默认FatalExceptionStrategy会记录warning信息.

3.1.14事务(Transactions)

介绍

SpringRabbit框架支持在同步和异步使用中使用不同语义(这一点对于现有Spring事务的用户是很熟悉的)来支持自动事务管理.它做了很多,不是常见消息模式能轻易实现的.

有两种方法可用来向框架发出期望事务语义的信号.在RabbitTemplate和SimpleMessageListenerContainer中,这里有一个channelTransacted标记,如果它为true,就会告知框架使用事务通道,并根据结果使用提交或回滚来结束所有操作,出现异常时则发出回滚信号.

另一个提供的信号是Spring的PlatformTransactionManager实现(作为正在进行的操作的上下文)的外部事务.
当框架发送或接收消息时,如果过程中已经存在一个事务,且channelTransacted标记为true,那么当前消息事务的提交或回滚操作会延迟直到在当前事务结束.如果channelTransacted标记为false,那么消息操作是不会应用事务语义(它是自动应答的).

channelTransacted标记是一个配置时设置:它只在AMQP组件声明时执行一次,通常在应用程序启动时.原则上,外部事务更加动态化,因为需要在运行时根据当前线程状态来响应,当事务分层到应用程序上时,原则上来说它通常也是一个配置设置.

对于使用RabbitTemplate的同步使用,外部事务是由调用者提供的,要么是声明的,要么是强制的(日常Spring事务模式).

下面是声明方法的一个例子(通常选择这个,因为它是非侵入的),下面的例子中,模板已经配置了channelTransacted=true:

@Transactional
public voiddoSomething(){
Stringincoming=rabbitTemplate.receiveAndConvert();
//dosomemoredatabaseprocessing...
Stringoutgoing=processInDatabaseAndExtractReply(incoming);
rabbitTemplate.convertAndSend(outgoing);
}
收到字符负荷,转换,并以消息体发送到@Transactional标记的方法中,因此如果数据处理因异常失败了,传入消息将返回到broker,并且输出消息不会被发送.
在事务方法链中,这适用于RabbitTemplate中的所有操作(除非Channel较早地直接控制了提交事务).

对于SimpleMessageListenerContainer的异步使用情况,如果需要外部事务,当设置了监听器时,必须由容器来发出请求.
为了表示需要外部事务,当配置时,用户为容器提供了PlatformTransactionManager实现.例如:

@Configuration
public classExampleExternalTransactionAmqpConfiguration{

@Bean
publicSimpleMessageListenerContainermessageListenerContainer(){
SimpleMessageListenerContainercontainer=newSimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
returncontainer;
}

}
在上面的例子中,事务管理器是通过其它bean中注入添加的(未显示),并且channelTransacted也设置为了true.其效果是如果监听器因异常失败了,那么事务将回滚,消息也会退回到broker中.
明显地,如果事务提交失败(如.数据库约束错误,或通过问题),那么AMQP事务也要回滚,且消息也会回退到broker中.
有时候,这被称为最好努力1阶段提交(BestEfforts1PhaseCommit),它是可靠消息非常强大的模式.
如果在上面的例子中将channelTransacted标志设为false(默认为false),那么外部事务仍会提供给监听器,但所有消息操作都是自动应答的,因此其效果是即使发生了业务操作,也会提供消息操作.

关于接收消息的回滚说明

AMQP事务只适用于发送应答给broker,所以当有Spring事务回滚且又收到了消息时,SpringAMQP做的不仅要回滚事务,还要手动拒绝消息.
消息上的拒绝操作独立于事务,依赖于defaultRequeueRejected属性(默认为true).更多关于拒绝失败消息的详情,请参考thesectioncalled“MessageListenersandtheAsynchronousCase”.

关于RabbitMQ事务及其局限性的更多信息,参考RabbitMQBrokerSemantics.

重要

RabbitMQ2.7.0前,这样的消息(当通道关闭或中断时未应的消息)会回到队列中,从2.7.0,拒绝消息会跑到队列前边,与JMS回滚消息方式类似.

使用RabbitTransactionManager

RabbitTransactionManager是执行同步,外部事务Rabbit操作的另一种选择.这个事务管理器是PlatformTransactionManager接口的实现类,应该在单个RabbitConnectionFactory中使用.

重要

此策略不能提供XA事务,比如,要在消息和数据库之间共享事务.

应用代码需要通过ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory,boolean)来获取事务性Rabbit资源而不是使用Connection.createChannel()调用.
当使用SpringAMQP的RabbitTemplate时,它会自动检测线程绑定通道和自动参与事务。

在Java配置中,你可以使用下面的代码来设置一个新的RabbitTransactionManager:

@Bean
publicRabbitTransactionManagerrabbitTransactionManager(){
returnnewRabbitTransactionManager(connectionFactory);
}
如果你喜欢使用XML配置,可以像下面进行声明:

<bean id="rabbitTxManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<propertyname="connectionFactory" ref="connectionFactory"/>
</bean>

3.1.15消息监听器容器配置

有相当多的配置SimpleMessageListenerContainer相关事务和服务质量的选项,它们之间可以互相交互.当使用命名空间来配置<rabbit:listener-container/>时,

下表显示了容器属性名称和它们等价的属性名称(在括号中).

未被命名空间暴露的属性,以`N/A`表示.

-第七部分

3.1.15消息监听器容器配置

有相当多的配置SimpleMessageListenerContainer相关事务和服务质量的选项,它们之间可以互相交互.当使用命名空间来配置<rabbit:listener-container/>时,

下表显示了容器属性名称和它们等价的属性名称(在括号中).

未被命名空间暴露的属性,以`N/A`表示.
Table3.3.消息监听器容器的配置选项
属性
(group)
描述

只在使用命名空间时可用.当经过指定时,类型为Collection<MessageListenerContainer>的bean会使用它这个名称进行注册,容器会将每个<listener/>元素添加到集合中.这就允许,如,通过迭代集合来启动/停止该组容器.如果多个<listener-container/>元素有相同的group值,那么集合中的容器是所有指定容器的总和.

属性

channelTransacted(channel-transacted)

描述

Boolean标志,用于表示在事务中的所有消息都应该应答(手动地或自动地)

属性


acknowledgeMode(acknowledge)

描述

NONE:不发送应答(与channelTransacted=true不兼容).RabbitMQ称此为"autoack",因为broker假设消费者没有采取任何动作应答了所有消息.

MANUAL:监听器必须调用Channel.basicAck()来手动应答所有消息.

AUTO:容器会自动应答所有消息,除非MessageListener抛出了异常.注意acknowledgeMode与channelTransacted是互补的-如果通道是事务的,那么broker除了ack外,还需要提交通知.这是默认模式.也可参考txSize.

属性

transactionManager(transaction-manager)

描述

监听器操作的外部事务管理器.也是与channelTransacted互补的-如果通道是事务的,那么其事务会用外部事务来同步.

属性

prefetchCount(prefetch)

描述

可接受来自broker一个socket帧中的消息数目.数值越大,消息分发速度就越快,但无序处理的风险更高.

如果acknowledgeMode为NONE它会忽略.它会增长,如果有必要,须匹配txSize.

属性

shutdownTimeout(N/A)

描述
当容器关闭时(例如.关闭ApplicationContext),用于等待正在传输消息的上限时间.默认是5秒.当达到上限时,如果通道是非事务的,消息将被丢弃.

属性

txSize(transaction-size)

描述
acknowledgeMode为AUTO时,在发送ack前(等待每一个消息达到接收超时设置),容器将试图处理这个数目的消息.当事务通道提交后也是一样的.如果prefetchCount小于txSize,prefetchCount会增长以匹配txSize.

属性

receiveTimeout(receive-timeout)
描述

等待消息的最大时间.如果acknowledgeMode=NONE这只有很小的效果-容器只旋转一轮,并要求另一个消息.当在txSize>1的事务通道中有最大效果,因为它能导致已经消费但没有应答的消息直接超时过期.

属性

autoStartup(auto-startup)

描述

用于当ApplicationContext启动时(作为SmartLifecycle回调的一部分,发生在所有bean初始化之后)是否同时启动容器的标志.默认为true,如果在容器启动时,中间件暂不可用,那么可将其设为false,随后在确认中间件已启动后,手动调用start()方法来启动.

属性

phase(phase)

描述

当autoStartup为true时,容器中的生命周期阶段应该启动和停止.值越小,容器就会越早启动,并越晚停止.默认值Integer.MAX_VALUE,这意味着容器会越晚启动并尽快停止.

属性

adviceChain(advice-chain)

描述

应用于监听器执行路径上的AOPAdvice数组.它可用于额外的横切关注点,如broker死亡事件中的自动重试.

注意,只要broker还活着,出现AMQP错误后的重新连接是由CachingConnectionFactory来处理的.

属性

taskExecutor(task-executor)

描述

执行监听器调用程序的SpringTaskExecutor引用(或标准JDK1.5+Executor).默认是SimpleAsyncTaskExecutor,用于内部管理线程.

属性

errorHandler(error-handler)

描述

在MessageListener执行期间,用于处理未捕获异常的ErrorHandler策略的引用.默认是ConditionalRejectingErrorHandler.

属性

concurrentConsumers(concurrency)

描述
每个监听器上初始启动的并发消费者数目.参考Section3.1.16,“ListenerConcurrency”.

属性

axConcurrentConsumers(max-concurrency)

描述

启动并发消费者的最大数目,如果有必要,可以按需设置.必须要大于或等于concurrentConsumers.

参考Section3.1.16,“ListenerConcurrency”.

属性

startConsumerMinInterval(min-start-interval)

描述
启动新消费者之间的时间间隔,单位为毫秒.
参考Section3.1.16,“ListenerConcurrency”.默认10000(10秒).

属性

stopConsumerMinInterval(min-stop-interval)

描述

停止消费者的时间间隔,由于最后一个消费者已经停止了,这时可以检测到空闲消费者.

参考Section3.1.16,“ListenerConcurrency”.默认60000(1分钟).

属性

consecutiveActiveTrigger(min-consecutive-active)

描述

消费者收到连续消息的最小数量,当考虑启动一个新的消费者,接收不会发生超时。也会受txsize影响.参考Section3.1.16,“ListenerConcurrency”.默认为10.

属性

consecutiveIdleTrigger(min-consecutive-idle)

描述

在考虑停止一个消费者,消费者必须经历的最小接收超时时间,也会受txsize影响.

参考Section3.1.16,“ListenerConcurrency”.默认为10.

属性

connectionFactory(connection-factory)

描述
connectionFactory的引用;当使用XML命名空间配置时,默认引用bean名称是"rabbitConnectionFactory".

属性

defaultRequeueRejected(requeue-rejected)

描述

用以确定因监听器抛出异常而遭受拒绝的消息是否需要重新入列.默认为true.

属性

recoveryInterval(recovery-interval)
描述

如果消费者不是因致命原因而导致启动失败,则用于设置重启消费者的时间间隔,单位毫秒.默认为5000.与recoveryBackOff互斥.

属性

recoveryBackOff(recovery-back-off)
描述

如果消费者不是因致命原因而导致启动失败,则用于指定BackOff启动消费者的时间间隔.默认是每5秒无限重试的FixedBackOff.与recoveryInterval互斥.

属性

exclusive(exclusive)
描述

用于确定容器中的单个消费者是否具有独占访问队列的权限。当其值为1时,容器的concurrency必须为1时。如果另一个消费者有独占访问权,容器将根据恢复时间间隔或恢复后退试图恢复消费者。
当使用命名空间时,此属性会随着队列名称出现在<rabbit:listener/>元素中。默认为false。

属性

rabbitAdmin(admin)
描述

监听器监听了多个自动删除队列时,当其发现在启动时队列消失了,容器会使用RabbitAdmin来声明消失的队列,并进行交换器的相关绑定.
如果此元素配置成使用条件声明(参考thesectioncalled“ConditionalDeclaration”),容器必须使用配置的admin来声明那些元素.
这里指定的admin;只在使用带有条件声明的自动删除队列时才需要.如果你不想在容器启动前声明自动删除队列,可在amdin中将auto-startup设为false.默认情况下,RabbitAdmin会声明所有非条件元素.

属性

missingQueuesFatal(missing-queues-fatal)
描述

从1.3.5版本开始,SimpleMessageListenerContainer就有了这个新属性.

当设为true(默认值)时,如果配置队列在中间件都不可用,这会视为是致命的.这会导致应用程序上下文初始化失败;同时,当容器还在运行时删除了队列,也会发生这样的情况.
默认情况下,消费者进行3次重试来连接队列(5秒时间间隔),如果所有尝试都失败了则会停止容器.

在以前版本中,此选项是不可配置的.

当设置为false,再做了三次重试后,容器将进入恢复模式,这也伴随其它问题,如中间件已经发生了故障.

容器会根据recoveryInterval属性来尝试恢复.在每次恢复尝试期间,每个消费者会以5秒的时间间隔来尝试4次被动声明.这个过程将无限期地继续下去(译者注:有点冲突)。

你也可以使用propertiesbean来为所有的容器全局设置属性,如下所示:

<util:properties id="spring.amqp.global.properties">
<prop key="smlc.missing.queues.fatal">false</prop>
</util:properties>
如果容器明确的设置了missingQueuesFatal属性,全局属性的值对此容器将无效.

默认的retry属性(5秒间隔3次重试)可通过下面的属性值来覆盖.

属性

mismatchedQueuesFatal(mismatched-queues-fatal)
描述

这是1.6版本中加入的新属性.当容器启动时,如果此属性为true(默认为false),容器会检查上下文中声明的队列是否中间件中存在的队列是否一致.

如果属性不匹配(如.auto-delete)或参数(e.g.x-message-ttl)存在,容器(和应用程序上下文)会抛出致命异常而导致启动失败.如果是在恢复期间检测到的问题,容器会停止.

必须在上下文中存在单个RabbitAdmin(或使用rabbitAdmin属性在容器上特别配置);否则此属性必须为false.

如果在初始启动期间,中间件还不可用,容器启动后,当建立连接时会检查条件.

重要

该检查针对的是上下文的所有队列,而不仅仅是特定监听器配置使用的队列.如果你希望只检查容器使用的队列,你需要为这个容器配置单独的RabbitAdmin,并使用rabbitAdmin属性为其提供一个引用.

参考“ConditionalDeclaration”章节来了解更多信息.

属性

autoDeclare(auto-declare)
描述

从1.4版本开始,SimpleMessageListenerContainer引入了这个新属性.

当设置为true时(默认值),容器会使用RabbitAdmin来重新声明所有AMQP对象(Queues,Exchanges,Bindings).
如果在启动期间探测到至少有一个队列缺失了,可能因为它是自动删除队列或过期队列,但不管队列缺失是基于什么原因,重新声明仍会进行处理(译者注:太浪费了).
要禁用这种行为,可设置其属性为false.但需要注意的是,如果所有队列都缺失了(译者注:全部还是部分),容器会启动失败.

在1.6版本之前,如果在上下文中存在多个admin,容器会随机选择一个.反之,如果没有admin,它会从内部创建一个.
无论是哪种情况,这都将导致非预期结果出现.从1.6版本开始,为了能使autoDeclare工作,必须要上下文中明确存在一个RabbitAdmin,或者特定实例的引用必须要在容器中使用rabbitAdmin属性中配置.

属性

declarationRetries(declaration-retries)
描述

从1.4.3,1.3.9版本开始,SimpleMessageListenerContainer有了这个新属性.命名空间属性在1.5.x中可用.

用于设置被动声明失败时,重新尝试的次数.被动声明发生在当消费者启动了或从多个队列中消费时,初始化期间部分队列还不可用的情况下.
当重试次数用完后,如果还是不能被动声明配置队列,那么上面的missingQueuesFatal属性将控制容器行为.默认:3次重试(4次尝试).

属性

failedDeclarationRetryInterval(failed-declaration-retry-interval)
描述

从1.4.3,1.3.9版本开始,SimpleMessageListenerContainer有了这个新属性.命名空间属性在1.5.x中可用.

重新尝试被动声明的时间间隔.被动声明发生在当消费者启动了或从多个队列中消费时,初始化期间部分队列还不可用的情况下.默认:5000(5秒).

属性

retryDeclarationInterval(missing-queue-retry-interval)
描述
从1.4.3,1.3.9版本开始,SimpleMessageListenerContainer有了这个新属性.命名空间属性在1.5.x中可用.

如果配置队列的一个子集在消费者初始化过程中可用,则消费者将从这些队列中开始消费。消费者将被动地使用此间隔声明丢失的队列。

当这个间隔过去后,会再次使用declarationRetries和failedDeclarationRetryInterval.

如果还有缺失队列,消费者在重新尝试之前会等待此时间间隔.

这个过程会不停地进行到所有队列可用.默认:60000(1分钟).

属性

consumerTagStrategy(consumer-tag-strategy)
描述

从1.4.5版本开始,SimpleMessageListenerContainer有了这个新属性.命名空间属性在1.5.x中可用.

之间,只能使用中间件生成的consumertags;尽管现在这仍是默认的配置,但现在你可以提供一个ConsumerTagStrategy的实现,这样就可为每个消费者创建独特的tag.

属性

idleEventInterval(idle-event-integer)
描述

从1.6版本开始,SimpleMessageListenerContainer有了这个新属性.

参考"DetectingIdleAsynchronousConsumers"章节.

3.1.16监听器并发

默认情况下,监听器容器会启动单个消费者来接收队列中的消息.

当检查前面章节中的表格时,你会发现有许多属性可控制并发.最简单的是concurrentConsumers,它会创建固定数量的消费者来并发处理消息.

在1.3.0版本之前,这只能在容器停止时才可设置.

从1.3.0版本开始,你可以动态调整concurrentConsumers属性.如果容器运行时修改了,会根据新设置来调需要的消费者(添加或删除).

此外,在容器中添加了一个新属性maxConcurrentConsumers来基于工作负载来动态调整并发数.

它可与其它四个属性一起工作:consecutiveActiveTrigger,startConsumerMinInterval,consecutiveIdleTrigger,stopConsumerMinInterval.
在默认设置的情况下,加大消费者的算法如下:

如果还没有达到maxConcurrentConsumers,如果现有消费者活动了10个连续周期且离最后消费者启动至少消逝了10秒钟,那么将启动新的消费者.如果消费者在txSize*receiveTimeout毫秒内至少收到一个消息,那么就认为此消费者是活动的.

在默认设置的情况下,减少消费者的算法如下:

如果有超过concurrentConsumers数量的消费者在运行,且检测到消费者连续超时(空闲)了10个周期,且最后一个消费者至少停止了60秒,那么消费者将停止.
超时依赖于receiveTimeout和txSize属性.当在txSize*receiveTimeout毫秒内未收到消息,则认为消费者是空闲的.
因此,当有默认超时(1秒)和txSize为4,那么在空闲40秒后,会认为消费者是空闲的并会停止(4超时对应1个空闲检测).

实际上,如果整个容器空闲一段时间,消费者将只会被停止。这是因为broker将分享其在所有活跃的消费者的工作。

3.1.17专用消费者

也是从1.3版本开始,监听器容器可配置单个专用消费者;这可以阻其它容器来消费队列直到当前消费者退出.

这样的容器的并发性必须是1。

当使用专用消费者时,其它容器会根据recoveryInterval属性来消费队列,如果尝试失败,会记录一个WARNing信息.

3.1.18监听器容器队列

1.3版本在监听器容器中引入许多处理多个队列的改善措施.

容器配置必须监听至少一个队列以上;以前也是这样的情况,但现在可以在运行时添加和删除队列了。当任何预先获取的消息被处理后,容器将回收(取消和重新创建)。
参考方法addQueues,addQueueNames,removeQueuesandremoveQueueNames.当删除队列时,至少要保留一个队列.

现在,只要有可用队列消费者就会启动-先前如果没有可用队列,容器会停止.现在,唯一的问题是是否有可用队列.如果只是部分队列可用,容器会每60秒尝试被动声明(和消费)缺失队列.

此外,如果消费才从broker中收到了通道(例如,队列被删除)消费者会尝试重新恢复,重新恢复的消费会继续处理来自其它配置队列中的消息.之前是队列上的取消会取消整个消费者,最终容器会因缺失队列而停止.

如果你想永久删除队列,你应该在删除队列的之前或之后更新容器,以避免消费.

3.1.19恢复:从错误和代理失败中恢复

介绍

Spring提供了一些关键的(最流行的)高级特性来处理协议错误或中间件失败时的恢复与自动重连接.

主要的重连接特性可通过CachingConnectionFactory自身来开启.它也常有利于使用rabbitadmin自动声明的特点.
除此之外,如果你关心保证投递,你也许需要在RabbitTemplate中使用channelTransacted标记以及在SimpleMessageListenerContainer中使用AcknowledgeMode.AUTO(或者自己来手动应答).

交换器、队列和绑定的自动声明

RabbitAdmin组件在启动时可声明交换器,队列,绑定.它是通过ConnectionListener懒执行的,因此如果启动时broker不存在,也没有关系.
Connection第一次使用时(如.发送消息),监听器会被触发,admin功能也会应用.这种在监听器中自动声明的好处是,如果连接出于任何原因断开了,(如.broker死了,网络中断问题.),它们会在下次有需要的时候重新应用.

这种方式的队列声明必须要有固定的名称;要么是明确声明,要么是由框架生成AnonymousQueue.匿名队列是非持久化的,专用的,且自动删除的.

重要

自动声明只在cachingConnectionFactory缓存模式是CHANNEL(默认)才可用.这种限制的存在是因为专用和自动删除队列是绑定到connection上的.

同步操作中的故障和重试选项

如果你在同步序列中使用RabbitTemplate时丢失了broker的连接,那么SpringAMQP会抛出一个AmqpException(通常但并不总是AmqpIOException).
我们不想隐藏存在问题的事实,因此你可以捕获并对异常进行处理.如果你怀疑连接丢失了,而且这不是你的错,那么最简单的事情就是执行再次尝试操作.重试操作可以手动进行,也可以使用SpringRetry来处理重试(强制或声明).

SpringRetry提供了两个AOP拦截器并提供非常灵活的方式来指定retry的参数(尝试的次数,异常类型,补偿算法等等.).SpringAMQP同时也提供了一些方便的工厂bean来创建SpringRetry拦截器,你可以使用强类型回调接口来实现恢复逻辑.参考Javadocs和StatefulRetryOperationsInterceptor和StatelessRetryOperationsInterceptor的属性来了解更多详情.

如果没有事务,或者如果一个事务是在重试回调中启动的话,则无状态重试是适当的。注意,相对于有状态重试,无状态重试只是简单配置和分析,如果存在一个正在进行的事务必须回滚或肯定会回滚的话, 这种无状态重试则是不合适的.
在事务中间掉下来的连接与回退有同样的效果,所以对于事务开始于堆栈上的重连接来说,有状态重试通常是最佳选择(soforreconnectionwherethetransactionisstartedhigherupthestack,statefulretryisusuallythebestchoice).

从1.3版本开始,提供了builderAPI来帮助在Java中使用这些拦截器(或者在@Configuration类中),例如:

@Bean
publicStatefulRetryOperationsInterceptorinterceptor(){
returnRetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000,2.0,10000)//initialInterval,multiplier,maxInterval
.build();
}
只有部分retry特性能通过这种方式,更加高级的特性需要在RetryTemplate中配置.

参考SpringRetryJavadocs来了解可用策略,配置的完整信息.

消息监听器和异步情况

如果MessageListener因业务异常而失败,异常可由消息监听器容器来处理,然后它会继续回去监听其它信息.如果失败是由于掉下的连接引起的(非业务异常),那么监听此消费者的监听器将退出和重启.

SimpleMessageListenerContainer可以无逢地进行处理,并且它会在日志中记录监听器即将重启.

事实上,它会循环不断地尝试重新启动消费者,只有当消费者有非常糟糕的行为时,才会放弃。一个副作用是,如果broker在容器启动时关闭,它将会继续尝试直到建立一个连接。

业务异常处理,相对于协议错误和连接丢失,它可能需要更多考虑和一些自定义配置,特别是处于事务或容器应答时.
在2.8.x版本之前,RabbitMQ对于死信行为没有定义,因此默认情况下,一个因拒绝或因业务异常导致回退的消息可循环往复地重新分发.
要限制客户端的重新分发的次数,一个选择是在监听器的通知链中添加一个StatefulRetryOperationsInterceptor.拦截器有一个实现了自定义死信动作的恢复回调:

什么是适合你的特定的环境。

另一个选择是设置容器的rejectRequeued属性为false.这会导致丢弃所有失败的消息.当使用RabbitMQ2.8.x+时,这也有利于传递消息到一个死的信件交换。

或者,你可以抛出一个AmqpRejectAndDontRequeueException;这会阻止消息重新入列,不管defaultRequeueRejected属性设置的是什么.

通常情况下,可以组合使用这两种技术在通知链中使用StatefulRetryOperationsInterceptor,在此处是MessageRecover抛出AmqpRejectAndDontRequeueException.MessageRecover会一直调用,直到耗尽了所有重试.
默认MessageRecoverer只是简单的消费错误消息,并发出WARN消息.在这种情况下,消息是通过应答的,且不会发送到死信交换器中.

从1.3版本开始,提供了一个新的RepublishMessageRecoverer,它允许在重试次数耗尽后,发布失败消息:

@Bean
RetryOperationsInterceptorinterceptor(){
returnRetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(newRepublishMessageRecoverer(amqpTemplate(),"bar","baz"))
.build();
}
RepublishMessageRecoverer会使用消息头的额外信息来发布,这些信息包括异常信息,栈轨迹,原始交换器和路由键.额外的头可通过创建其子类和覆盖additionalHeaders()方法来添加.

重试的异常分类

SpringRetry可以非常灵活地决定哪些异常可调用重试.默认配置是对所有异常都进行重试.用户异常可以包装在ListenerExecutionFailedException中,我们需要确保分类检查异常原因.默认的分类只是看顶部级别的异常。

SpringRetry1.0.3开始,BinaryExceptionClassifier有一个属性traverseCauses(默认为false).当当为true时,它将遍历异常的原因,直到它找到一个匹配或没有原因。

要使用分类重试,需要使用一个SimpleRetryPolicy,其构造函数将接受最大尝试次数,Exception的Map,以及一个boolean值(traverseCauses),且还需要将此策略注入给RetryTemplate.

3.1.20调试

SpringAMQP提供广泛的日志记录,尤其是在DEBUG级别.

如果你想在应用程序和broker之间监控AMQP协议,你可以使用像WireShark的工具,它有一个插件可用于解码协议.
另一个选择是,RabbitMQjavaclient自身携带了一个非常有用的工具类:Tracer.当以main方式运行时,默认情况下,它监听于5673,并连接本地的5672端口.
只需要简单的运行它,并修改你的连接工厂配置,将其连接到本地的5673端口.它就会在控制台中显示解码的协议信息.参考Tracerjavadocs来了解详细信息.

3.2LoggingSubsystemAMQPAppenders

框架为多个流行的日志系统提供了日志appenders:

log4j(从SpringAMQP1.1版本开始)

logback(从SpringAMQP1.4版本开始)

log4j2(从SpringAMQP1.6版本开始)

appenders使用正常机制为为子系统配置,可用属性参照下面的规定。

3.2.1共同属性

下面的属性对于所有appenders都可用:

Table3.4.共同Appender属性

PropertyDefaultDescription
exchangeNamelogs用于发布日志事件的交换器名称.
exchangeTypetopic发布日志事件的交换器类型-只在appender声明了交换器的情况下才需要.参考declareExchange.
routingKeyPattern%c.%p日志子系统生成路由键的模式格式.
applicationIdApplicationID-如果模式包含%X{applicationId},则将其添加到路由键.
senderPoolSize2用于发布日志事件的线程数目.
maxSenderRetries30当broker不可用时或有某些错误时,重试的次数.延时重试像:N^log(N),N表示重试次数.
addresses一个逗号分隔的broker地址列表:host:port[,host:port]*-覆盖host和port.
hostlocalhost要连接RabbitMQ的主机.
port5672
virtualHost/要连接的RabbitMQ虚拟主机.
usernameguest要连接RabbitMQ的用户.
passwordguest要连接RabbitMQ的用户密码.
contentTypetext/plain日志消息的content-type属性.
contentEncoding日志消息的content-encoding属性.
declareExchangefalse当appender启动时,是否需要声明配置的交换器.也可参考durable和autoDelete.
durabletruedeclareExchange为true,durable标志才会设置此值.
autoDeletefalsedeclareExchange为true,autodelete标志才会设置此值.
charsetnull当将字符串转成byte[]时要使用的编码,默认为null(使用系统默认字符集).如果当前平台上不支持此字符集,将回退到使用系统字符集.
deliveryModePERSISTENTPERSISTENT或NON_PERSISTENT用于决定RabbitMQ是否应该持久化消息.
generateIdfalse用于确定messageId属性是否需要设置成唯一值.
clientConnectionPropertiesnull一个逗号分隔的key:value对,它是连接RabbitMQ时设置的自定义客户端属性

3.2.2Log4jAppender

样例log4j.properties片断.

log4j.appender.amqp.addresses=foo:5672,bar:5672
log4j.appender.amqp=org.springframework.amqp.rabbit.log4j.AmqpAppender
log4j.appender.amqp.applicationId=myApplication
log4j.appender.amqp.routingKeyPattern=%X{applicationId}.%c.%p
log4j.appender.amqp.layout=org.apache.log4j.PatternLayout
log4j.appender.amqp.layout.ConversionPattern=%d%p%t[%c]-<%m>%n
log4j.appender.amqp.generateId=true
log4j.appender.amqp.charset=UTF-8
log4j.appender.amqp.durable=false
log4j.appender.amqp.deliveryMode=NON_PERSISTENT
log4j.appender.amqp.declareExchange=true

3.2.3Log4j2Appender

[b]样例log4j2.xml片断.[/b]

<Appenders>
...
<RabbitMQname="rabbitmq"
addresses="foo:5672,bar:5672"user="guest"password="guest"virtualHost="/"
exchange="log4j2"exchangeType="topic"declareExchange="true"durable="true"autoDelete="false"
applicationId="myAppId"routingKeyPattern="%X{applicationId}.%c.%p"
contentType="text/plain"contentEncoding="UTF-8"generateId="true"deliveryMode="NON_PERSISTENT"
charset="UTF-8"
senderPoolSize="3"maxSenderRetries="5">
</RabbitMQ>
</Appenders>

3.2.4LogbackAppender

样例logback.xml[b]片断.[/b]

<appendername="AMQP"class="org.springframework.amqp.rabbit.logback.AmqpAppender">
<layout>
<pattern><![CDATA[%d%p%t[%c]-<%m>%n]]></pattern>
</layout>
<addresses>foo:5672,bar:5672</addresses>
<abbreviation>36</abbreviation>
<applicationId>myApplication</applicationId>
<routingKeyPattern>%property{applicationId}.%c.%p</routingKeyPattern>
<generateId>true</generateId>
<charset>UTF-8</charset>
<durable>false</durable>
<deliveryMode>NON_PERSISTENT</deliveryMode>
<declareExchange>true</declareExchange>
</appender>

3.2.5定制Messages

每个appenders都可以子类化,以允许你在发布前修改消息.

CustomizingtheLogMessages.

public classMyEnhancedAppenderextendsAmqpAppender{

@Override
 publicMessagepostProcessMessageBeforeSend(Messagemessage,Eventevent){
message.getMessageProperties().setHeader("foo","bar");
returnmessage;
}

}

3.2.6定制客户端属性

简化String属性

每个appender都支持在RabbitMQ连接中添加客户端属性.

log4j.

log4j.appender.amqp.clientConnectionProperties=foo:bar,baz:qux
logback.

<appender name="AMQP"...>...
<clientConnectionProperties>foo:bar,baz:qux</clientConnectionProperties>
...</appender>
log4j2.

<Appenders> ... <RabbitMQname="rabbitmq"...clientConnectionProperties="foo:bar,baz:qux"...</RabbitMQ></Appenders>
这些属性是逗号分隔的key:value队列表;键和值不能包含逗号或冒号.

当RabbitMQ Admin UI中查看连接上,你会看到这些属性.

Log4j和logback先进技术

使用log4j和logbackappenders,appenders可以是子类化的,允许你在连接建立前,修改客户连接属性:

定制客户端连接属性.

public classMyEnhancedAppenderextendsAmqpAppender{

privateStringfoo;

@Override
protected voidupdateConnectionClientProperties(Map<String,Object>clientProperties){
clientProperties.put("foo",this.foo);
}

public voidsetFoo(Stringfoo){
this.foo=foo;
}

}
对于log4j2,添加log4j.appender.amqp.foo=bar到log4j.properties来设置发展.

对于logback,在logback.xml中添加<foo>bar</foo>.

当然,对于像这个例子中简单的String属性,可以使用先前的技术;

子类允许更丰富的属性(如添加Map的numeric属性).

使用log4j2,子类是不被支持的,因为log4j2使用静态工厂方法.

3.3样例应用程序

3.3.1介绍

SpringAMQPSamples项目包含了两个样例应用程序.第一个简单的"HelloWorld"示例演示了同步和异步消息的处理.它为理解基础部分提供了一个很好的开端.
第二个基于股票交易的例子演示了真实应用程序中的交互场景.在本章中,我们会每个示例进行快速浏览,使您可以专注于最重要的组成部分.
这两个例子都是基于Maven的,因此你可以直接将它们导入任何支持Maven的IDE中(如.SpringSourceToolSuite).

3.3.2HelloWorld

介绍

HelloWorld示例演示了同步和异步消息处理.你可以导入spring-rabbit-helloworld示例到IDE中并跟随下面的讨论.

同步例子

在src/main/java目录中,导航到org.springframework.amqp.helloworld包中.

打开HelloWorldConfiguration类,你可以注意到它包含了@Configuration类级注解和一些@Bean方法级注解.

这是Spring的基于Java的配置.你可进一步的了解here.

@Bean
publicConnectionFactoryconnectionFactory(){
CachingConnectionFactoryconnectionFactory=null;
connectionFactory=newCachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
returnconnectionFactory;
}
配置中同样也包含了RabbitAdmin的实例,它会默认查找类型为Exchange,Queue,或Binding的bean并在broker中进行声明.
事实上,"helloWorldQueue"bean是在HelloWorldConfiguration中生成的,因为它是Queue的实例.

@Bean
publicQueuehelloWorldQueue(){
returnnewQueue(this.helloWorldQueueName);
}
重看"rabbitTemplate"bean配置,你会看到它将helloWorldQueue的名称设成了"queue"属性(用于接收消息)以及"routingKey"属性(用于发送消息).

现在,我们已经探索了配置,让我们看看实际上使用这些组件的代码。

首先,从同一个包内打开Producer类。它包含一个用于创建SpringApplicationContext的main()方法.

publicstaticvoidmain(String[]args){

ApplicationContextcontext=
newAnnotationConfigApplicationContext(HelloWorldConfiguration.class);
AmqpTemplateamqpTemplate=context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("HelloWorld");
System.out.println("Sent:HelloWorld");
}
在上面的例子中你可以看到,取回的AmqpTemplate用来发送消息.因为客户端代码应该尽可能地依赖于接口,因此类型是AmqpTemplate而不是RabbitTemplate.
即使在HelloWorldConfiguration中创建的bean是RabbitTemplate的实例,依赖于接口则意味着这端代码更具有便携性(portable)(配置可以独立于代码进行修改).
因为convertAndSend()方法是通过模板来调用的,因此模板会将调用委派给它的MessageConverter实例.在这种情况下,它默认使用的是SimpleMessageConverter,但也可以在HelloWorldConfiguration中为"rabbitTemplate"指定其它的实现.

现在打开Consumer类.它实际上共享了同一个配置基类,这意味着它将共享"rabbitTemplate"bean.这就是为什么我们要使用"routingKey"(发送)和"queue"(接收)来配置模板的原因.
正如你在Section3.1.4,“AmqpTemplate”中看到的,你可以代替在发送方法中传递routingKey参数,代替在接收方法中传递queue参数.Consumer代码基本上是Producer的镜子,只不过调用的是receiveAndConvert()而非convertAndSend()方法.

publicstaticvoidmain(String[]args){
ApplicationContextcontext=
newAnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplateamqpTemplate=context.getBean(AmqpTemplate.class);
System.out.println("Received:"+amqpTemplate.receiveAndConvert());
}
你如果运行Producer,然后再运行Consumer,在控制台输出中,你应该能看到消息"Received:HelloWorld"

异步示例

我们已经讲解了同步HelloWorld样例,是时候移动到一个稍微先进,但更强大的选择上了.稍微修改一下代码,HelloWorld样例就可以可以提供异步接收的示例了,又名Message-drivenPOJOs.事实上,有一个子包明确地提供了这种功能:org.springframework.amqp.samples.helloworld.async.

再一次地我们将从发送端开始.打开ProducerConfiguration类可注意到它创建了一个"connectionFactory"和"rabbitTemplate"bean.
这次,由于配置是专用于消息发送端,因此我们不需要任何队列定义,RabbitTemplate只须设置routingKey属性.
回想一下,消息是发送到交换器上的而不是直接发到队列上的.AMQP默认交换器是无名称的direct类型交换器.
所有队列都是通过使用它们的名称作为路由键绑定到默认交换器上的.这就是为什么在这里我们只提供路由键的原因.

publicRabbitTemplaterabbitTemplate(){
RabbitTemplatetemplate=newRabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
returntemplate;
}
由于这个示例展示的是异步消息处理,生产方设计为连续发送消息(尽管类似于同步版本中的message-per-execution模型,但不太明显,实际上它是消息驱动消费者)负责连续发送消息的组件是作为ProducerConfiguration类中的内部类来定义的,每3秒执行一次.

static classScheduledProducer{

@Autowired
 private volatileRabbitTemplaterabbitTemplate;

private finalAtomicIntegercounter=newAtomicInteger();

@Scheduled(fixedRate=3000)
 public voidsendMessage(){
rabbitTemplate.convertAndSend("HelloWorld"+counter.incrementAndGet());
}
}
你不必要完全了解这些细节,因为真正的关注点是接收方(我们马上就会讲解).然而,如果你还熟悉Spring3.0任务调度支持,你可从here这里来了解.
简短故事是:在ProducerConfiguration中的"postProcessor"bean使用调度器来注册了任务.

现在,让我们转向接收方.为强调Message-drivenPOJO行为,将从对消息起反应的组件开始.

此类被称为HelloWorldHandler.

publicclassHelloWorldHandler{

publicvoidhandleMessage(Stringtext){
System.out.println("Received:"+text);
}

}
相当明显的,这是一个POJO.它没有继承任何基类,它没有实现任何接口,它甚至不包含任何导入.它将通过SpringAMQPMessageListenerAdapter来适配MessageListener接口.然后适配器可配置在SimpleMessageListenerContainer上.
在这个例子中,容器是在ConsumerConfiguration类中创建的.你可以看到POJO是包装在适配器中的.

@Bean
publicSimpleMessageListenerContainerlistenerContainer(){
SimpleMessageListenerContainercontainer=newSimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(newMessageListenerAdapter(newHelloWorldHandler()));
returncontainer;
}
SimpleMessageListenerContainer是一个Spring生命周期组件,默认会自动启动.如果你看了Consumer类的内部,你会看到main()方法中除了一行启动创建ApplicationContext的代码外,其它什么都没有.
Producer的main()方法也只有一行启动,因为以@Scheduled注解的组件会自动开始执行.你可以任何顺序来启动Producer和Consumer,你会看每秒就会发送消息和接收到消息.

3.3.3股票交易(StockTrading)

StockTrading示例演示了比HelloWorld示例更高级的消息场景.然而,配置却是很相似的-只是有一点复杂.
由于我们已经详细讲解了HelloWorld配置,因此在这里我们将重点关注不一样的东西.有一个服务器发送市场数据(股票报价)到Topic交换器中.
然后,客户端可订阅市场数据,即通过使用路由模式(如."app.stock.quotes.nasdaq.*")来绑定队列(e.g."app.stock.quotes.nasdaq.*").
这个例子的其它主要功能是有一个请求回复“股票交易”的互动,它是由客户发起并由服务器来处理的.这涉及到一个私有的“回复(replyTo)”队列,发送客户端的信息在请求消息中。

服务器的核心配置在RabbitServerConfiguration类中(位于org.springframework.amqp.rabbit.stocks.config.server包中).
它继承了AbstractStockAppRabbitConfiguration.这是服务器和客户端定义常用资源的地方,包括市场数据Topic交换器(其名称为app.stock.marketdata)以及服务器公开股票交易的队列(其名称为app.stock.request).
在那个公共配置文件中,你会看到在RabbitTemplate上配置了一个JsonMessageConverter.

服务器特有配置由2部分组成.首先,它在RabbitTemplate上配置了市场数据交换器,这样在发送消息时,就不必提供交换器名称.它是通过基础配置类中的抽象回调方法中定义做到这一点的.

public voidconfigureRabbitTemplate(RabbitTemplaterabbitTemplate){
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}
其次,声明了股票请求队列.在这里,它不需要任何明确的绑定,因为它将以它自己的名称作为路由键来绑定到无名称的默认交换器上.正如先前提到的,AMQP规范定义了此种行为.

@BeanpublicQueuestockRequestQueue(){
returnnewQueue(STOCK_REQUEST_QUEUE_NAME);
}
现在你已经看过了服务器的AMQP资源配置,导航到src/test/java目录下的org.springframework.amqp.rabbit.stocks包.在那里你会实际的提供了main()方法的Server类.
它基于server-bootstrap.xml创建了一个ApplicationContext.在那里,你会看到发布虚假市场数据的调度任务.

那个配置依赖于Spring3.0的"task"命名空间支持.bootstrap配置文件也导入了其它一些文件.最令人关注的是位于src/main/resources目录下的server-messaging.xml.在那里,你会看到"messageListenerContainer"bean,它负责处理股票交易请求.
最后在看一下定义在src/main/resources目录下的server-handlers.xml,其中定义了一个"serverHandler"bean.这个bean是ServerHandler类的实例,它是Message-drivenPOJO的好例子,它也有发送回复消息的能力.
注意,它自身并没有与框架或任何AMQP概念耦合.它只是简单地接受TradeRequest并返回一个TradeResponse.

publicTradeResponsehandleMessage(TradeRequesttradeRequest){...
}
现在我们已经看了服务端的重要配置和代码,让我们转向客户端.最佳起点是从org.springframework.amqp.rabbit.stocks.config.client包下的RabbitClientConfiguration开始.

注意,它声明了两个不带明确参数的队列.

@Bean
publicQueuemarketDataQueue(){
returnamqpAdmin().declareQueue();
}

@Bean
publicQueuetraderJoeQueue(){
returnamqpAdmin().declareQueue();
}
那些是私有队列,唯一名称会自动自成.客户端会用第一个生成的队列来绑定由服务端公开的市场交换器.
记住在AMQP中,消费者与队列交互,而生产者与交换器交互.队列和交换器之间的绑定指示broker从给定的交换器中投递或路由什么消息给队列.
由于市场交换器是一个Topic交换器,绑定可通过路由正则表达式来表达.

RabbitClientConfiguration声明了一个Binding对象,其对象是通过BindingBuilder的便利API来生成的.

@Value("${stocks.quote.pattern}")
privateStringmarketDataRoutingKey;

@Bean
publicBindingmarketDataBinding(){
returnBindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
注意,实际值已经在属性文件(src/main/resources目录下的"client.properties")中外部化了,因此我们使用Spring的@Value注解来注入值.这通常是一个好主意,否则值就会硬编码在类中,没有修改就没有重新编译.

在这种情况下,通过修改绑定中的路由正则表达式,可很容易地运行多个版本的Client.让我们立即尝试.

启动运行org.springframework.amqp.rabbit.stocks.Server然后再运行org.springframework.amqp.rabbit.stocks.Client.你将会看到NASDAQ股票的交易报价,因为关联stocks.quote.pattern键的值在client.properties中是app.stock.quotes.nasdaq.
现在,保持现有Server和Client运行,将其属性值修改为app.stock.quotes.nyse.再启动第二个Client实例.你会看到第一个client仍然接收NASDAQ报价,而第二个client接收的NYSE报价.你可以改变模式,获取所有的股票报价或个别股票的报价。

最后一个我们将暴露的特性是从客户端的角度来看待请求-回复交互.记住我们已经看了ServerHandler,它会接受TradeRequest对象并返回TradeResponse对象.客户端相应的代码是RabbitStockServiceGateway(位于org.springframework.amqp.rabbit.stocks.gateway包).为发送消息,它会委派给RabbitTemplate.

public voidsend(TradeRequesttradeRequest){
getRabbitTemplate().convertAndSend(tradeRequest,newMessagePostProcessor(){
publicMessagepostProcessMessage(Messagemessage)throwsAmqpException{
message.getMessageProperties().setReplyTo(newAddress(defaultReplyToQueue));
try{
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch(UnsupportedEncodingExceptione){
thrownewAmqpException(e);
}
returnmessage;
}
});
}
注意,在发送消息前,它设置了"replyTo"地址.这提供了队列,此队列是由上面的"traderJoeQueue"bean定义生成的.以下是StockServiceGateway类的@Bean定义.

@Bean
publicStockServiceGatewaystockServiceGateway(){
RabbitStockServiceGatewaygateway=newRabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
returngateway;
}
如果你没有运行服务器和客户端,现在就启动它们.尝试使用100TCKR的格式来发送请求.经过一个简短的人工延迟来模拟“处理”请求,你应该看到一个确认消息出现在客户端上。

3.4测试支持

3.4.1介绍

为异步程序写集成测试比测试简单程序更复杂.当引入了@RabbitListener这样的注解时,这尤其更加复杂.

现在的问题是发送消息后,如何来验证,监听器按预期收到了消息.

框架自身带有许多单元测试和集成测试;有些使用mocks,另外一些使用真实的RabbitMQbroker来集成测试.您可以参照测试场景的一些想法进行测试。

SpringAMQP1.6版本引入了sring-rabbit-testjar,它提供一些测试复杂场景的测试.预计这一项目将随着时间的推移进行扩展,但我们需要社会反馈以帮助测试。请使用JIRA问题或GitHub提供这样的反馈。

3.4.2MockitoAnswer<?>实现

当前有两个Answer<?>实现可帮助测试:

第一个,LatchCountDownAndCallRealMethodAnswer提供了返回null和计数下一个锁存器的Answer<Void>.

LatchCountDownAndCallRealMethodAnsweranswer=newLatchCountDownAndCallRealMethodAnswer(2);
doAnswer(answer)
.when(listener).foo(anyString(),anyString());

...

assertTrue(answer.getLatch().await(10,TimeUnit.SECONDS));
第二个,LambdaAnswer<T>提供了一种调用真正方法的机制,并提供机会来返回定制结果(基于InvocationOnMock和结果).

public classFoo{

publicStringfoo(Stringfoo){
returnfoo.toUpperCase();
}

}
Foofoo=spy(newFoo());

doAnswer(newLambdaAnswer<String>(true,(i,r)->r+r))
.when(foo).foo(anyString());
assertEquals("FOOFOO",foo.foo("foo"));

doAnswer(newLambdaAnswer<String>(true,(i,r)->r+i.getArguments()[0]))
.when(foo).foo(anyString());
assertEquals("FOOfoo",foo.foo("foo"));

doAnswer(newLambdaAnswer<String>(false,(i,r)->
""+i.getArguments()[0]+i.getArguments()[0])).when(foo).foo(anyString());
assertEquals("foofoo",foo.foo("foo"));
WhenusingJava7orearlier:

doAnswer(newLambdaAnswer<String>(true,newValueToReturn<String>(){
@OverridepublicStringapply(InvocationOnMocki,Stringr){
returnr+r;
}
})).when(foo).foo(anyString());

3.4.3@RabbitListenerTestandRabbitListenerTestHarness

在你的@Configuration类中使用@RabbitListenerTest(它也会通过@EnableRabbit来启用@RabbitListener探测).注解会导致框架使用子类RabbitListenerTestHarness来代替标准RabbitListenerAnnotationBeanPostProcessor.

RabbitListenerTestHarness通过两种方式来增强监听器-将其包装进MockitoSpy,启用了Mockito存根和验证操作.也可在监听器添加Advice来启用对参数,结果或异常的访问.
你可以控制哪一个(或两个)来在@RabbitListenerTest上启用属性.后者用于访问调用中更为低级数据-它也支持测试线程阻塞,直到异步监听器被调用.

重要

final@RabbitListener不能被发现或通知,同时,只有带id属性的监听器才能发现或通知.

让我们看一些例子.

使用spy:

@Configuration
@RabbitListenerTest
public classConfig{

@Bean
 publicListenerlistener(){
returnnewListener();
}

...

}

public classListener{

@RabbitListener(id="foo",queues="#{queue1.name}")
 publicStringfoo(Stringfoo){
returnfoo.toUpperCase();
}

@RabbitListener(id="bar",queues="#{queue2.name}")
 public voidfoo(@PayloadStringfoo,@Header("amqp_receivedRoutingKey")Stringrk){
...
}

}

public classMyTests{

@Autowired
privateRabbitListenerTestHarnessharness;
@Test
 public voidtestTwoWay()throwsException{
assertEquals("FOO",this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(),"foo"));

Listenerlistener=this.harness.getSpy("foo");
assertNotNull(listener);
verify(listener).foo("foo");
}

@Test
public voidtestOneWay()throwsException{
Listenerlistener=this.harness.getSpy("bar");
assertNotNull(listener);

LatchCountDownAndCallRealMethodAnsweranswer=newLatchCountDownAndCallRealMethodAnswer(2);
doAnswer(answer).when(listener).foo(anyString(),anyString());
  this.rabbitTemplate.convertAndSend(this.queue2.getName(),"bar");
  this.rabbitTemplate.convertAndSend(this.queue2.getName(),"baz");

assertTrue(answer.getLatch().await(10,TimeUnit.SECONDS));
verify(listener).foo("bar",this.queue2.getName());
verify(listener).foo("baz",this.queue2.getName());
}

}

将harness注入进测试用于,这样我们可访问spy.

获取spy引用,这样我们可以验证是否按预期在调用.由于这是一个发送和接收操作,因此不必暂停测试线程,因为RabbitTemplate在等待回复时已经暂停过了.

在这种情况下,我们只使用了发送操作,因为我们需要一个门闩来等待对容器线程中监听器的异步调用.我们使用了Answer<?>一个实现来帮助完成.

配置spy来调用Answer.
使用捕获建议:

@Configuration
@ComponentScan
@RabbitListenerTest(spy=false,capture=true)
public classConfig{

}

@Service
public classListener{

private booleanfailed;

@RabbitListener(id="foo",queues="#{queue1.name}")
 publicStringfoo(Stringfoo){
returnfoo.toUpperCase();
}

@RabbitListener(id="bar",queues="#{queue2.name}")
public voidfoo(@PayloadStringfoo,@Header("amqp_receivedRoutingKey")Stringrk){
if(!failed&&foo.equals("ex")){
failed=true;
thrownewRuntimeException(foo);
}
failed=false;
}

}

public classMyTests{

 @Autowired
  privateRabbitListenerTestHarnessharness;
  @Test
  public voidtestTwoWay()throwsException{
assertEquals("FOO",this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(),"foo"  ));

InvocationDatainvocationData=
this.harness.getNextInvocationDataFor("foo",0,TimeUnit.SECONDS);
assertThat(invocationData.getArguments()[0],equalTo("foo"));
assertThat((String)invocationData.getResult(),equalTo("FOO"));
}

@Test
  public voidtestOneWay()throwsException{
this.rabbitTemplate.convertAndSend(this.queue2.getName(),"bar");
this.rabbitTemplate.convertAndSend(this.queue2.getName(),"baz");
this.rabbitTemplate.convertAndSend(this.queue2.getName(),"ex");

InvocationDatainvocationData=
this.harness.getNextInvocationDataFor("bar",10,TimeUnit.SECONDS);
Object[]args=invocationData.getArguments();
assertThat((String)args[0],equalTo("bar"));
assertThat((String)args[1],equalTo(queue2.getName()));

invocationData=this.harness.getNextInvocationDataFor("bar",10,TimeUnit.SECONDS);
args=invocationData.getArguments();
assertThat((String)args[0],equalTo("baz"));

invocationData=this.harness.getNextInvocationDataFor("bar",10,TimeUnit.SECONDS);
args=invocationData.getArguments();
assertThat((String)args[0],equalTo("ex"));
assertEquals("ex",invocationData.getThrowable().getMessage());
}

}

将harness注入进测试用例,以便我们能获取spy的访问.

使用harness.getNextInvocationDataFor()来获取调用数据-在这种情况下,由于它处于request/reply场景,因为没有必要等待,因为测试线程在RabbitTemplate中等待结果的时候,已经暂停过了.

我们可以验证参数和结果是否与预期一致

这次,我们需要时间来等待数据,因为它在容器线程上是异步操作,我们需要暂停测试线程.

当监听器抛出异常时,可用调用数据中的throwable属性
4.Spring整合-参考

这部分参考文档提供了在Spring集成项目中提供AMQP支持的快速介绍.

4.1Spring整合AMQP支持4.1.1介绍

SpringIntegration项目包含了构建于SpringAMQP项目之上的AMQP通道适配器(ChannelAdapters)和网关(Gateways).那些适配器是在Spring集成项目中开发和发布的.在Spring集成中,"通道适配器"是单向的,而网关是双向的(请求-响应).
我们提供了入站通道适配器(inbound-channel-adapter),出站通道适配器(outbound-channel-adapter),入站网关(inbound-gateway),以及出站网关(outbound-gateway).

由于AMQP适配器只是Spring集成版本的一部分,因为文档也只针对Spring集成发行版本部分可用.

作为一个品酒师,我们只快速了解这里的主要特征。

4.1.2入站通道适配器

为了从队列中接收AMQP消息,需要配置一个个<inbound-channel-adapter>

<amqp:inbound-channel-adapter channel="fromAMQP" queue-names="some.queue" connection-factory="rabbitConnectionFactory"/>

4.1.3出站通道适配器
为了向交换器发送AMQP消息,需要配置一个<outbound-channel-adapter>.除了交换名称外,还可选择提供路由键。

<amqp:outbound-channel-adapter channel="toAMQP" exchange-name="some.exchange"    routing-key="foo" amqp-template="rabbitTemplate"/>

4.1.4入站网关

为了从队列中接收AMQP消息,并回复到它的reply-to地址,需要配置一个<inbound-gateway>.

<amqp:inbound-gateway request-channel="fromAMQP" reply-channel="toAMQP"         queue-names="some.queue" connection-factory="rabbitConnectionFactory"/>

4.1.5出站网关

为了向交换器发送AMQP消息并接收来自远程客户端的响应,需要配置一个<outbound-gateway>.

除了交换名称外,还可选择提供路由键。

<amqp:outbound-gateway request-channel="toAMQP" reply-channel="fromAMQP"       exchange-name="some.exchange" routing-key="foo" amqp-template="rabbitTemplate"/>

5.其它资源

除了这份参考文档,还有其它资源可帮助你了解AMQP.

5.1进阶阅读

对于那些不熟悉AMQP的人来说,规范实际上具有相当的可读性.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  AMQP