Spring AMQP 1.6完整参考指南
2016-12-23 00:00
204 查看
-第一部分
原文:http://docs.spring.io/spring-amqp/docs/1.6.0.RELEASE/reference/html/
1.前言
SpringAMQP项目将其核心Spring概念应用于基于AMQP消息解决方案的开发中.我们提供了一个发送和接收消息的高级抽象模板.同时,我们也提供了消息驱动POJO的支持.这些包有助于AMQP资源的管理,从而提升依赖注入和声明式配置的使用.在所有这些情况中,你会发现与SpringFramework对JMS支持是相似的.对于其它项目相关的信息,可访问SpringAMQP项目主页
2.介绍
本参考文档的第一部分是对SpringAMQP的高层次以及底层的概念的讲述,还有一些可使你尽快上手运行的代码片断.
2.1快速浏览
2.1.1介绍
5分钟开启SpringAMQP的旅行.
前提:
安装和运行RabbitMQbroker(http://www.rabbitmq.com/download.html).然后获取spring-rabbitJAR以及其依赖包-最简单的方式是在你的构建工具中声明依赖,如.对于Maven来说:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.6.0.RELEASE</version></dependency>
对于gradle:
compile'org.springframework.amqp:spring-rabbit:1.6.0.RELEASE'
兼容性
由于默认的SpringFramework版本依赖是4.2.x,SpringAMQP一般来说可兼容早期版本的SpringFramework.但基于注解的监听器和RabbitMessagingTemplate需要SpringFramework4.1+.
类似地,默认的amqp-client版本是3.6.x,但framework兼容3.4.0+.但依赖于新客户端版本的功能将无法使用.注意,这里指的是javaclientlibrary;一般来说,对于老版本的broker也可以工作.
非常非常快速
使用纯java来发送和接收消息:
ConnectionFactoryconnectionFactory=newCachingConnectionFactory();
AmqpAdminadmin=newRabbitAdmin(connectionFactory);
admin.declareQueue(newQueue("myqueue"));
AmqpTemplatetemplate=newRabbitTemplate(connectionFactory);
template.convertAndSend("myqueue","foo");
Stringfoo=(String)template.receiveAndConvert("myqueue");
注意,在原生的JavaRabbitclient中也有一个ConnectionFactory.在上面的代码中,我们使用的是Spring抽象.我们依赖的是broker中的默认交换器(因为在发送操作中没有指定交换器),以及默认绑定(通过其名称作为路由键将所有队列绑定到默认交换器中).那些行为是由AMQP规范来定义的.
使用XML配置
同上面的例子一样,只不过将外部资源配置到了XML:
ApplicationContextcontext=newGenericXmlApplicationContext("classpath:/rabbit-context.xml");
AmqpTemplatetemplate=context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue","foo");
Stringfoo=(String)template.receiveAndConvert("myqueue");
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsdhttp://www.springframework.org/schema/beans'target='_blank'>http://www.springframework.org/schema/beans/spring-beans.xsd">[/code]<rabbit:connection-factory id="connectionFactory"/>
<rabbit:template id="amqpTemplate"connection-factory="connectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="myqueue"/>
</beans>
<rabbit:admin/>声明会默认自动查找类型为Queue,Exchange和Binding的bean,并宣称他们代表的broker的用户,因此在简单的Javadriver中没有必要明确的使用那个bean.
有大量的选项来配置XMLschema中的组件属性-你可以使用xml编辑的自动完成功能来探索它们并查看它们的相关文档.
使用Java配置
同样的例子可使用java来外部配置:
ApplicationContextcontext=newAnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplatetemplate=context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue","foo");
Stringfoo=(String)template.receiveAndConvert("myqueue");
........
@Configuration
public classRabbitConfiguration{
@Bean
publicConnectionFactoryconnectionFactory(){
returnnewCachingConnectionFactory("localhost");
}
@Bean
publicAmqpAdminamqpAdmin(){
returnnewRabbitAdmin(connectionFactory());
}
@Bean
publicRabbitTemplaterabbitTemplate(){
returnnewRabbitTemplate(connectionFactory());
}
@Bean
publicQueuemyQueue(){
returnnewQueue("myqueue");
}
}
2.2新特性
2.2.1从1.5到1.6的变化
测试支持
提供了一个新的测试支持包.参考Section3.4,“TestingSupport”来了解更多信息.
Builder
现在Builders提供了更方便的API来配置Queue和Exchange对象.参考thesectioncalled“BuilderAPIforQueuesandExchanges”来了解更多信息.
命名空间变化
连接工厂
为连接工厂bean声明增加了thread-factory,例如,以amqp-client包创建的线程命名.参考Section3.1.2,“ConnectionandResourceManagement”来了解更多信息.
当使用CacheMode.CONNECTION时,现在你可以限制允许连接的总数了.参考Section3.1.2,“ConnectionandResourceManagement”来了解更多信息.
Queue定义
现在为匿名队列提供了命名策略;参考thesectioncalled“AnonymousQueue”来了解更多信息.
监听器容器变化
空闲消息监听器检测
当空闲时,现在可以配置监听器容器来发布ApplicationEvent事件了.参考thesectioncalled“DetectingIdleAsynchronousConsumers”来了解更多信息.
不匹配的队列检测
默认情况下,当监听器容器启动时,如果探测到了队列的mismatched属性或参数,容器会记录异常但会继续监听.现在容器有了一个mismatchedQueuesFatal属性,当启动时存在问题的话,这可以阻止容器(以及上下文)启动.如果随后检测到问题,它也会停止容器,如从连接故障中恢复.参考Section3.1.15,“MessageListenerContainerConfiguration”来了解更多信息.
监听器容器日志
现在监听器容器提供了它的beanName给内部SimpleAsyncTaskExecutor作为threadNamePrefix.对于日志分析非常有用.
默认错误处理器
默认错误处理器(ConditionalRejectingErrorHandler)现在认为无可挽救的@RabbitListener异常是致命的.参考Section3.1.13,“ExceptionHandling”来了解更多信息.
AutoDeclare与RabbitAdmins
参考Section3.1.15,“MessageListenerContainerConfiguration”(autoDeclare)来查看在应用程序上下文使用RabbitAdmins的语法变化.
AmqpTemplate:receive与timeout
AmqpTemplate和它的RabbitTemplate实现引入了许多新的带有timeout的receive()方法.参考thesectioncalled“PollingConsumer”来了解更多信息.
AsyncRabbitTemplate
引入了新的AsyncRabbitTemplate.此模块提供了许多发送和接收的方法,其返回值为ListenableFuture,此后可通过它来同步或异步地获取结果.参考thesectioncalled“AsyncRabbitTemplate”来了解更多信息
RabbitTemplate变化
1.4.1在broker支持时,引入了Directreply-to的能力;它比使用临时队列来回应更高效.这个版本允许你覆盖这种默认行为,通常设置useTemporaryReplyQueues属性为true来使用临时队列.参考thesectioncalled“RabbitMQDirectreply-to”来了解更多信息.
RabbitTemplate现在支持user-id-expression(userIdExpression当使用Java配置时).参考SeeValidatedUser-IDRabbitMQdocumentationandthesectioncalled“ValidatedUserId”来了解更多信息.
消息属性
CorrelationId
correlationId消息属性现在可以是字符串了.参考thesectioncalled“MessagePropertiesConverters”来了解更多信息.
长字符串头
以前,DefaultMessagePropertiesConverter会将头转换成(其头长度不超过长字符串限制(默认为1024))成一个DataInputStream(实际上它只是引用LongString的DataInputStream).
在输出时,这个头不会进行转换(除了字符串,例如.在java.io.DataInputStream@1d057a39上调用toString()方法).
在这个版本中,LongLongString默认作为LongStrings;你可以通过其getBytes[],toString(),或getStream()方法来访问它的内容.更大的输入LongString现在也会在输出上正确转换了.
参考thesectioncalled“MessagePropertiesConverters”来了解更多信息.
InboundDeliveryMode
deliveryMode属性不再映射MessageProperties.deliveryMode;这是为了避免意外传播,如果同一个MessageProperties对象用来发送出站消息.
相反,入站deliveryMode头映射为MessageProperties.receivedDeliveryMode.
参考thesectioncalled“MessagePropertiesConverters”来了解更多信息.
当使用注解endpoints时,头将在名为AmqpHeaders.RECEIVED_DELIVERY_MODE的头中提供.
参考thesectioncalled“AnnotatedEndpointMethodSignature”来了解更多信息.
InboundUserID
user_id属性不再映射MessageProperties.userId;这是为了避免意外传播,如果同一个MessageProperties对象用来发送出站消息.相反,入站userId头会映射到MessageProperties.receivedUserId.
参考thesectioncalled“MessagePropertiesConverters”来了解更多信息.
当使用注解endpoints时,头将在名为AmqpHeaders.RECEIVED_USER_ID的头中提供.
参考thesectioncalled“AnnotatedEndpointMethodSignature”来了解更多信息.
RabbitAdmin变化
DeclarationFailures
之间,ignoreDeclarationFailures标志只在channel上发生IOException才会生效(如未匹配参数).现在它对于任何异常都可以生效(如:TimeoutException).
此外,无论何时声明失败,都会发布DeclarationExceptionEvent.RabbitAdmin最后声明事件将作为属性lastDeclarationExceptionEvent也是可用的.
参考Section3.1.10,“Configuringthebroker”来了解更多信息.
@RabbitListenerChanges
MultipleContainersperBean
当使用Java8+,可以添加多个@RabbitListener注解到@Bean类或它们的方法上.当使用Java7-,你可以使用@RabbitListeners容器注解来提供同样的功能.
参考thesectioncalled“@Repeatable@RabbitListener”来了解更多信息.
@SendToSpELExpressions
@SendToforroutingreplieswithnoreplyTopropertycannowbeSpELexpressionsevaluatedagainsttherequest/reply.
参考thesectioncalled“ReplyManagement”来了解更多信息.
@QueueBindingImprovements
现在你可以在@QueueBinding注解中为queues,exchanges和bindings指定参数.
Header交换器可通过@QueueBinding来支持.
参考thesectioncalled“Annotation-drivenListenerEndpoints”来了解更多信息.
延迟消息交换器(DelayedMessageExchange)
SpringAMQP现在有了第一个支持RabbitMQDelayedMessageExchange插件.参考Section3.1.11,“DelayedMessageExchange”来了解更多信息.
交换器内部标志(Exchangeinternalflag)
任何Exchange定义现在都可标记为internal,当声明交换器时,RabbitAdmin会将值传递给broker.
参考Section3.1.10,“Configuringthebroker”来了解更多信息.
CachingConnectionFactory变化
CachingConnectionFactoryCacheStatistics
CachingConnectionFactory现在通过运行时和JMX提供了cache属性.参考thesectioncalled“RuntimeCacheProperties”来了解更多信息.
访问底层RabbitMQ连接工厂
添加了一个新的getter方法来获取底层factory.这是很有用的,例如,可以添加自定义属性.参考Section3.1.3,“AddingCustomClientConnectionProperties”来了解更多信息.
ChannelCache
默认的channel缓存大小已从1增加到了25.参考Section3.1.2,“ConnectionandResourceManagement”来了解更多信息.
此外,SimpleMessageListenerContainer不再设置缓存大小至少要与concurrentConsumers的数目一样大-这是多余的,因为容器消费者channels是不会缓存的.
RabbitConnectionFactoryBean
factorybean现在暴露了一个属性来将client连接属性添加到由工厂产生的连接中.
Java反序列化(Deserialization)
当使用Java反序列化时,现在允许配置类的白名单.如果你从不可信来源接收消息,考虑创建白名单是很重要的.参考thesectioncalled“JavaDeserialization”来了解更多信息.
JSONMessageConverter
改善了JSON消息转换器,现在允许消费消息在消息头中可以没有类型(type)信息.参考thesectioncalled“MessageConversionforAnnotatedMethods”和thesectioncalled“Jackson2JsonMessageConverter”来了解更多信息.
LoggingAppenders
Log4j2
加入了log4j2appender,此appenders现在可配置addresses属性来连接broker集群.
Client连接属性
现在你可以向RabbitMQ连接中加入自定义连接属性了.
参考Section3.2,“LoggingSubsystemAMQPAppenders”来了解更多信息.
2.2.2早期版本
参考SectionA.2,“PreviousReleases”来了解早期版本的变化.
-第二部分
3.参考
这部分参考文档详细描述了组成SringAMQP的各种组件.mainchapter涵盖了开发AMQP应用程序的核心类.这部分也包含了有关示例程序的章节.
3.1使用SpringAMQP
在本章中,我们将探索接口和类,它们是使用SpringAMQP来开发应用程序的必要组件.
3.1.1AMQP抽象
介绍
SpringAMQP由少数几个模块组成,每个都以JAR的形式来表现.这些模块是:spring-amqp和spring-rabbit.spring-amqp模块包含org.springframework.amqp.core包.
在那个包中,你会找到表示AMQP核心模块的类.我们的目的是提供通用的抽象,不依赖于任何特定中间件的实现或AMQP客户端库。
最终用户代码将更具有移植性,以便跨供应商实现,因为它们可以对抽象层开发。这些抽象是使用broker的特定模块实现的,如spring-rabbit。
目前只有一个RabbitMQ实现;而对于抽象的验证,除了RabbitMQ外,也已经在.Net平台上使用ApacheQpid得到了验证。
由于AMQP原则上工作于协议层次,RabbitMQ客户端可以在任何支持相同的协议版本的broker中使用,但目前我们没有测试其它任何broker。
这里的概述假设你已经熟悉了AMQP规范.如果你还没有,那么你可以查看第5章,其它资源中列举的资源.
Message
AMQP 0-8和0-9-1规范没有定义一个消息类或接口.相反,当执行basicPublish()这样的操作的时候,内容是以字节数组参数进行传递的,其它额外属性也是以单独参数进行传递的.SpringAMQP定义了一个Message类来作为AMQP领域模型表示的一部分.Message类的目的是在单个实例中简化封装消息体(body)和属性(header),这样API就可以变得更简单.Message类的定义相当简单.
public classMessage{
private finalMessagePropertiesmessageProperties;
private final byte[]body;
publicMessage(byte[]body,MessagePropertiesmessageProperties){
this.body=body;
this.messageProperties=messageProperties;
}
public byte[]getBody(){
returnthis.body;
}
publicMessagePropertiesgetMessageProperties(){
returnthis.messageProperties;
}
}
MessageProperties接口定义了多个共同属性,如messageId,timestamp,contentType等等.那些属性可以通过调用setHeader(Stringkey,Objectvalue)方法使用用户定义头(user-definedheaders)来扩展.
Exchange
Exchange接口代表的是AMQPExchange,它是生产者发送消息的地方.broker虚拟主机中的交换器名称都是唯一的,同时还有少量的其它属性:
public interfaceExchange{
StringgetName();
StringgetExchangeType();
booleanisDurable();
booleanisAutoDelete();
Map<String,Object>getArguments();
}
正如你所看到的,Exchange还有一个type(它是在ExchangeTypes中定义的常量).基本类型是:Direct,Topic,Fanout,和Headers.
在核心包中,你可以找到每种类型的Exchange接口实现.这些交换器类型会在处理队列绑定时,行为有所不同.
例如,Direct交换器允许队列以固定路由键进行绑定(通常是队列的名称).
Topic交换器支持使用路由正则表达式(*通配符明确匹配一个,而#通配符可匹配0个或多个).
Fanout交换器会把消息发布到所有绑定到它上面的队列而不考虑任何路由键.
关于交换器类型的更多信息,查看Chapter5,OtherResources.
AMQP规范还要求任何broker必须提供一个默认的无名字的(空字符串)Direct交换器.所有声明的队列都可以用它们的名字作为路由键绑定到默认交换器中.在Section3.1.4,“AmqpTemplate”你会了解到更多关于在SringAMQP中使用默认交换器的使用情况.
Queue
Queue代表的是消费者接收消息的组件.像各种各样的Exchange类,我们的实现目标是作为核心AMQP类型的抽象表示.
public classQueue{
private finalStringname;
private volatile booleandurable;
private volatile booleanexclusive;
private volatile booleanautoDelete;
private volatileMap<String,Object>arguments;
/**
*Thequeueisdurable,non-exclusiveandnonauto-delete.
*
*@paramnamethenameofthequeue.
*/
publicQueue(Stringname){
this(name,true,false,false);
}
//GettersandSettersomittedforbrevity
}
注意,构造器需要接受队列名称作为参数.根据实现,admintemplate可能会提供生成独特队列名称的方法.这些队列作为回复地址或用于临时情景是非常有用的.
基于这种原因,自动生成队列的exclusive和autoDelete属性都应该设置为true.
参考Section3.1.10,“Configuringthebroker”来了解关于使用命名空间来声明队列,包括队列参数的详细情况.
Binding
生产者发送消息到Exchange,而消费者将从Queue中获取消息,连接Queues与Exchanges之间的绑定对于通过消息来连接生产者和消费者是非常关键的.
在SpringAMQP中,我们定义了一个Binding类来表示这些连接.让我们重新回顾一下绑定队列和交换器的操作.
你可以使用固定的路由键来绑定Queue到DirectExchange上.
newBinding(someQueue,someDirectExchange,"foo.bar")
你可以使用路由正则表达式来绑定Queue到TopicExchange上.
newBinding(someQueue,someTopicExchange,"foo.*")
你可以不使用路由键来绑定Queue到FanoutExchange上.
newBinding(someQueue,someFanoutExchange)
我们还提供了BindingBuilder来方便操作.
Bindingb=BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
上面展示的BindingBuilder类很清晰,但如果为bind()方法使用静态导入,这种形式将工作得更好.
本身来说,Binding类的实例只能一个connection中持有数据.换句话说,它不是一个活力(active)组件.
但正如在后面Section3.1.10,“Configuringthebroker”看到的,Binding实例可由AmqpAdmin类来触发broker上的绑定操作.
同样,在同一个章节中,你还会看到Binding实例可在@Configuration类中使用Spring @Bean风格来定义.
还有方便的基类来简化生成AMQP相关bean定义和识别队列,交换器,绑定的方法,这样当AMQPbroker运行程序启动时,就可以得到声明.
AmqpTemplate也在核心包中定义.作为涉及AMQP消息的主要组件,会在它自己的章节中进行详细介绍(参考Section3.1.4,“AmqpTemplate”).
3.1.2连接和资源管理
介绍
虽然我们在前面章节中描述的AMQP模型是通用的,适用于所有实现,但当我们说到资源管理时,其细节是针对特定broker实现的.因此,在这个章节中,我们只关注我们的"spring-rabbit"模块,因为到现在为止,RabbitMQ是唯一支持的实现.
RabbitMQbroker中用于管理连接的中心组件是ConnectionFactory接口.ConnectionFactory实现的责任是提供一个org.springframework.amqp.rabbit.connection.Connection的实例,它包装了com.rabbitmq.client.Connection.
我们提供的唯一具体实现提CachingConnectionFactory,默认情况下,会建立应用程序可共享的单个连接代理.连接共享是可行的,因为在AMQP处理消息的工作单位实际是"channel"(在某些方面,这类似于JMS中Connection和Sessionin的关系).
你可以想象,连接实例提供了一个createChannel方法。CachingConnectionFactory实现支持这些channels的缓存,它会基于它们是否是事务的来单独维护其缓存.
当创建CachingConnectionFactory的实例时,hostname可通过构造器来提供,username和password 属性也可以提供.如果你想配置channel缓存的大小(默认是25),你可以调用setChannelCacheSize()方法.
从1.3版本开始,CachingConnectionFactory也可以同channel一样,配置缓存连接.在这种情况下每次调用createConnection()都会创建一个新连接(或者从缓存中获取空闲的连接).
关闭连接会将其返回到缓存中(如果还没有达到缓存大小的话).在这些连接上创建的Channels同样也会被缓存.单独连接的使用在某些环境中是有用的,如从HA集群中消费,连接负载均衡器,连接不同的集群成员.设置cacheMode为CacheMode.CONNECTION.
这不会限制连接的数目,它用于指定允许空闲打开连接的数目.
从1.5.5版本开始,提供了一个新属性connectionLimit.当设置了此属性时,它会限制连接的总数目,当达到限制值时,将channelCheckoutTimeLimit来等待空闲连接.如果时间超时了,将抛出AmqpTimeoutException.
重要
当缓存模式是CONNECTION时,队列的自动声明等等(参考thesectioncalled“AutomaticDeclarationofExchanges,QueuesandBindings”)将不再支持.
此外,在写作的时候,rabbitmq-client包默认为每个连接(5个线程)创建了一个固定的线程池.当需要使用大量连接时,你应该考虑在CachingConnectionFactory定制一个executor.然后,同一个executor会用于所有连接,其线程也是共享的.
executor的线程池是没有界限的或按预期使用率来设置(通常,一个连接至少应该有一个线程).如果在每个连接上创建了多个channels,那么池的大小会影响并发性,因此一个可变的线程池executor应该是最合适的.
理解缓存大小不是限制是很重要的,它仅仅是可以缓存的通道数量.当说缓存大小是10时,在实际使用中,其实可以是任何数目的通道.如果超过10个通道被使用,他们都返回到高速缓存,10个将在高速缓存中,其余的将物理关闭。
从1.6版本开始,默认通道缓存大小从1增加到了25.在高容量,多线程环境中,较小的缓存意味着通道的创建和关闭将以很高的速率运行.加大默认缓存大小可避免这种开销.
你可以监控通道的使用情况(通过RabbitMQAdminUI),如果看到有很多通道在创建和关闭,你可以增大缓存大小.缓存只会增长按需(以适应应用程序的并发性要求),所以这个更改不会影响现有的低容量应用程序。
从1.4.2版本开始,CachingConnectionFactory有一个channelCheckoutTimeout属性.当此属性的值大于0时,channelCacheSize会变成连接上创建通道数目的限制.
如果达到了限制,调用线程将会阻塞,直到某个通道可用或者超时,在后者的情况中,将抛出AmqpTimeoutException异常.
在框架(如.RabbitTemplate)中使用的通道将会可靠地返回到缓存中.如果在框架外创建了通道(如.直接访问connection(s)并调用createChannel()),你必须可靠地返回它们(通过关闭),也许需要在finally块中以防止耗尽通道.
CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connectionconnection=connectionFactory.createConnection();
当使用XML时,配置看起来像下面这样:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username"value="guest"/>
<property name="password" value="guest"/>
</bean>
这里也有一个SingleConnectionFactory实现,它只能用于框架的单元测试代码中.它比CachingConnectionFactory简单,因为它不会缓存通道,由于其缺乏性能和韧性,它不打算用于简单的测试以外的实际使用.
如果基于某些原因,你需要自己来实现ConnectionFactory,AbstractConnectionFactory基类提供了一个非常好的起点.
ConnectionFactory可使用rabbit命名空间来快速方便的建立:
<rabbit:connection-factory id="connectionFactory"/>
在多数情况下,这是很好的,因为框架会为你选择最好的默认值.创建的实例会是CachingConnectionFactory.要记住,默认的缓存大小是25.如果你想缓存更多的通道,你可以设置channelCacheSize属性值.在XML中,它看起来像下面这样:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>
在命名空间中,你也可以添加channel-cache-size属性:
<rabbit:connection-factory id="connectionFactory" channel-cache-size="50"/>