Java客户端API指南
2017-04-26 15:12
453 查看
客户端API在AMQP 0-9-1协议规范上进行了密切的建模,并提供了更多的抽象方便使用。
RabbitMQ Java客户端使用com.rabbitmq.client作为其顶级包。关键类和接口是:
渠道
连接
ConnectionFactory
消费者
通过Channel界面提供协议 操作。连接用于打开通道,注册连接生命周期事件处理程序,并关闭不再需要的连接。 连接通过ConnectionFactory实例化,它是如何配置各种连接设置,如vhost或用户名。
核心API类是连接 和通道,分别表示AMQP
0-9-1连接和通道。它们通常在使用前进口:
以下代码使用给定的参数(主机名,端口号等)连接到AMQP代理:
所有这些参数对于本地运行的RabbitMQ服务器具有明智的默认值。
或者,可以使用URI:
所有这些参数对于在本地运行的库存RabbitMQ服务器具有明智的默认值。
的连接然后接口可以用于打开一个通道:
现在可以使用该通道来发送和接收消息,如后续部分所述。
要断开连接,只需关闭通道和连接:
请注意,关闭频道可能被认为是良好的做法,但这并不是绝对必要的 - 当底层连接关闭时,它将自动完成。
客户端应用程序与交换和队列(AMQP的高级构建块)一起工作。这些必须在被声明之前才能被使用。声明任何类型的对象只是确保其中一个名称存在,如有必要,创建它。
继续前面的例子,以下代码声明一个交换和一个队列,然后将它们绑定在一起。
这将主动声明以下对象,这两个对象可以通过使用其他参数进行定制。这两个都没有任何特殊的论据。
持久,非自动删除的“直接”类型的交换
具有生成名称的非持久,独占,自动删除队列
上述函数调用然后使用给定的路由密钥将队列绑定到交换机。
请注意,当只有一个客户端想要使用它时,这将是一种典型的方式来声明一个队列:它不需要一个众所周知的名称,没有其他客户端可以使用它(排他),并将自动清除(autodelete )。如果几个客户端想要与一个众所周知的名称共享一个队列,这个代码是适当的:
这将积极宣布:
持久,非自动删除的“直接”类型的交换
一个持久,非排他性,非自动删除队列与着名的名字
请注意,所有这些Channel API方法都是重载的。这些方便的简单形式的exchangeDeclare,queueDeclare和queueBind 使用合理的默认值。还有更多的表单具有更多的参数,可以根据需要覆盖这些默认值,并在需要时进行完全控制。
在客户端API使用中使用这种“简短形式,长形式”模式。
要将消息发布到交换机,请按如下所示使用Channel.basicPublish:
为了精细控制,您可以使用重载变量来指定必需标志,或者使用预设消息属性发送消息:
这将发送一个带有传递模式2(持久),优先级1和内容类型“文本/普通”的消息。您可以构建自己的消息属性对象,使用Builder类提及您喜欢的许多属性,例如:
此示例发布带有自定义标头的消息:
此示例发布具有到期的消息:
我们没有在这里展示所有的可能性。
请注意,BasicProperties是自动生成的持有人类AMQP的内部类。
的调用通道#basicPublish最终会阻塞如果一个 资源驱动的报警生效。
线程之间不能共享通道实例。应用程序应该优先使用每个线程的通道,而不是跨多个线程共享相同的通道。虽然通道上的一些操作可以安全地同时调用,但有些操作不会导致线路上错误的帧交错。线程之间的共享通道也会干扰* 发布者确认。
接收消息的最有效的方法是使用Consumer 界面设置订阅。然后,消息将在其到达时自动传递,而不必被明确请求。
在调用与Consumer相关的API方法 时,个人订阅总是由其消费者标签引用,它们可以是客户端或服务器生成的,如AMQP规范文档中所述。不同的消费者对相同的S 通道必须具有不同的消费标签。
实现Consumer的最简单方法是将便利类DefaultConsumer子类化。此子类的对象可以在basicConsume 调用中传递以设置订阅:
在这里,因为我们指定AUTOACK =
假,就必须承认交付给消息消费者,最容易进行handleDelivery 方法,如图所示。
更复杂的消费者将需要覆盖更多的方法。特别是,handleShutdownSignal 当通道和连接关闭被调用,handleConsumeOk传递消费者标签的任何其他回调到之前消费者被调用。
消费者还可以实现 handleCancelOk和handleCancel 方法,以分别通知显式和隐式取消。
你可以明确地取消特定的消费者与 Channel.basicCancel:
传递消费者标签。
对消费者的回调在与由Connection管理的线程分开的线程上 调度。这意味着 Consumer可以安全地在Connection或Channel上调用阻塞方法 ,例如 queueDeclare,txCommit, basicCancel或basicPublish。
每个频道都有自己的发送线程。对于每个 渠道一个消费者最常用的用例,这意味着消费者不会阻止其他消费者。如果你有多个 消费者每人S 通道意识到,一个长期运行的消费者可能会回调派遣容纳其他消费者 S于该频道。
要显式检索邮件,请使用 Channel.basicGet。返回的值是GetResponse的一个实例,从中可以提取头信息(属性)和消息体:
并且由于AUTOACK = 假以上,则必须也呼吁Channel.basicAck确认您已成功收到的消息:
如果发布了一条消息,其中设置了“强制”标志,但不能路由,则代理将返回给发送客户端(通过AMQP.Basic.Return 命令)。
要通知此类返回,客户端可以实现ReturnListener 接口并调用Channel.setReturnListener。如果客户端尚未为特定通道配置返回侦听器,则相关的返回消息将被静默地删除。
将会调用一个返回侦听器,例如,如果客户端发布一条消息,其中“强制”标志被设置为没有绑定到队列的“直接”类型的交换。
AMQP 0-9-1连接和通道具有以下生命周期状态:
打开:对象准备使用
关闭:对象已被明确通知关闭本地,已向任何支持的下层对象发出关闭请求,并正在等待其关闭过程完成
关闭:对象已经从任何下层对象接收到所有关闭完成通知,因此已关闭自身
这些对象总是处于关闭状态,不管导致关闭的原因,如应用程序请求,内部客户端库故障,远程网络请求或网络故障。
AMQP连接和通道对象具有以下与shutdown相关的方法:
addShutdownListener(ShutdownListener
listener)和removeShutdownListener(ShutdownListener listener)来管理任何侦听器,当对象转换到关闭状态时,它将被触发 。请注意,将ShutdownListener添加到已经关闭的对象将立即触发侦听器
getCloseReason(),以便调查对象关闭的原因
isOpen(),用于测试对象是否处于打开状态
close(int closeCode,String closeMessage),显式通知对象关闭
听众的简单使用将如下所示:
该ShutdownSignalException类提供方法来分析关机的原因。通过调用isHardError()方法中,我们得到的信息是否是连接或信道错误,并getReason()返回的信息有关的原因,在形式的AMQP方法-无论是 AMQP.Channel.Close或 AMQP.Connection.Close(或者如果原因是库中的一些异常,例如网络通信失败,在这种情况下可以使用getCause()检索异常),则返回null 。
相反,我们通常应该忽略这种检查,只需尝试所需的操作。如果代码的执行期间的连接的通道被关闭,一个ShutdownSignalException将被抛出,指示对象处于无效状态。我们还应该捕获由SocketException 导致的IOException,当代理程序意外关闭连接时,或者ShutdownSignalException,当代理启动clean时关闭。
无论执行人及的ExecutorService类中的java.util.concurrent包。
当连接被关闭的默认的ExecutorService 将关闭()
,但一个用户提供 的ExecutorService(如ES上文)将 不被关断()
。提供自定义ExecutorService的客户端必须确保最终关闭(通过调用其shutdown()方法),否则池的线程可能会阻止JVM终止。
相同的执行器服务可以在多个连接之间共享,或者在重新连接时重新使用,但在shutdown()之后不能使用 。
只有在有证据表明消费者 回访处理存在严重瓶颈时,才应考虑使用此功能。如果没有执行消费者回调,或者很少,默认分配就足够了。开销最初是最小的,并且分配的线程资源总量是有界的,即使有时也会发生消费者活动的爆发。
将尝试连接到hostname1:portnumber1,并且如果hostname2不成功:portnumber2。返回的连接是数组中第一个成功(不抛出 IOException)。这完全等同于在工厂中重复设置主机和端口,每次调用factory.newConnection(),直到其中一个成功。
如果提供ExecutorService(使用表单factory.newConnection(es,addrArr)),线程池与(第一个)成功连接相关联。
如果您希望更多地控制主机连接,请参阅 对服务发现 的支持。
该AddressResolver界面是这样的:
就像使用主机列表一样,首先尝试返回的第一个地址,然后如果客户端无法连接到第一个地址,则等待第二个。
如果还提供ExecutorService(使用表单factory.newConnection(es,addressResolver)),则线程池与(第一个)成功连接相关联。
该AddressResolver是实现定制服务发现逻辑,这是一个动态的基础设施特别有用的理想场所。结合自动恢复,客户端可以自动连接到首次启动时未平衡的节点。亲和度和负载平衡是其他一些自定义AddressResolver可能有用的场景。
Java客户端附带以下实现(有关详细信息,请参阅javadoc):
DnsRecordIpAddressResolver:给出主机的名称,返回其IP地址(针对平台DNS服务器的分辨率)。这对于简单的基于DNS的负载平衡或故障切换很有用。
DnsSrvRecordAddressResolver:给定服务的名称,返回主机名/端口对。搜索被实现为DNS
SRV请求。当使用像HashiCorp Consul这样的服务注册表时,这可能很有用 。
App Engine的示例。
使用默认阻止IO模式,每个连接使用线程从网络套接字读取。使用NIO模式,您可以控制从/到网络套接字读取和写入的线程数。
如果您的Java进程使用许多连接(数十或数百),请使用NIO模式。您应该使用比默认阻止模式更少的线程。使用适当数量的线程设置,您不应该尝试任何性能下降,特别是如果连接不那么忙。
NIO必须明确启用:
NIO模式可以通过NioParams类进行配置:
NIO模式使用合理的默认值,但您可能需要根据自己的工作负载进行更改。一些设置是:使用的IO线程总数,缓冲区大小,IO循环中使用的服务执行程序,内存中写入队列的参数(写入请求在网络发送之前排入队列)。请阅读Javadoc了解详细信息和默认值。
重新连接
恢复连接侦听器
重新开放频道
恢复频道听众
恢复频道basic.qos设置,发布商确认和交易设置
拓扑恢复包括对每个通道执行的以下操作
重新声明交换(预定义除外)
重新声明队列
恢复所有绑定
恢复所有消费者
从Java客户端4.0.0版开始,默认情况下启用自动恢复(因此也可以进行拓扑恢复)。
要禁用或启用自动连接恢复,请使用factory.setAutomaticRecoveryEnabled(boolean) 方法。以下代码段显示了如何明确启用自动恢复(例如,对于4.0.0以前的Java客户端):
如果由于异常而导致恢复失败(例如,RabbitMQ节点仍无法访问),则会在固定的时间间隔(默认为5秒)后重试。可以配置间隔:
当提供地址列表时,列表被洗牌,并尝试所有地址,一个接着一个:
addRecoveryListener
removeRecoveryListener
请注意,您目前需要将连接和渠道转换为可恢复 才能使用这些方法。
如果需要,可以明确地禁用拓扑恢复:
与连接,通道,恢复和消费者生命周期相关的未处理的异常被委派给异常处理程序。异常处理程序是实现ExceptionHandler接口的任何对象 。默认情况下,使用DefaultExceptionHandler的实例。它将异常详细信息打印到标准输出。
可以使用ConnectionFactory#setExceptionHandler来覆盖处理程序 。它将用于由工厂创建的所有连接:
异常处理程序应用于异常记录。
从版本4.0.0开始,客户端会收集运行时度量(例如发布的消息数)。Metrics集合是可选的,并使用setMetricsCollector(metricsCollector)方法在ConnectionFactory级别 进行设置。此方法需要一个MetricsCollector实例,该实例在客户端代码的几个位置调用。
客户端附带一个MetricsCollector使用实施Dropwizard指标 库。可以通过以下方式启用指标收集:
以下是收集的指标:
打开的连接数( 默认实现中的计数器)
开放通道数( 默认实现中为计数器)
发布的消息数(一米的默认实现)
消费的消息数(一米的默认实现)
确认的消息数(一米的默认实现)
被拒绝的消息数(一米的默认实现)
通过使用Dropwizard Metrics,不仅可以计数,还可以使用通用工具(JMX,Graphite,Ganglia,HTTP)的平均速率,最后五分钟的速率等等。
请注意以下关于指标收集:
如果您使用基于Dropwizard Metrics的默认实现,请勿忘记将适当的JAR文件添加到类路径中(Dropwizard Metrics不会自动与Java客户端一起提取,因为它是可选依赖项)。
Metrics集合是可扩展的,没有任何阻止您根据特定需要实现自己的 MetricsCollector。
所述MetricsCollector设置在ConnectionFactory的 水平,但可以翻过不同实例共享。
指标集不支持交易。例如,如果在事务中发送确认,然后事务被回滚,确认将被计入客户端度量(而不是经纪人)。请注意,确认实际上是发送到代理,然后由事务回滚取消,因此客户端指标在发送确认方面是正确的。作为总结,不要使用客户指标进行敏感的业务检查,因为它们不能保证是完全准确的。
您通常会将MetricsRegistry的一个实例传递 给StandardMetricsCollector。以下是JMX的一个例子:
在Google App Engine(GAE)上使用RabbitMQ Java客户端需要使用自定义线程工厂来实例化线程使用GAE的ThreadManager(见上文)。另外,有必要设置一个低心跳间隔(4-5秒),以避免在GAE 上进入低输入流读取超时:
为了使拓扑恢复成为可能,RabbitMQ Java客户机维护一个声明的队列,交换和绑定的缓存。缓存是每个连接。某些RabbitMQ功能使得客户端不可能观察到一些拓扑变化,例如当由于TTL而删除队列时。RabbitMQ Java客户端尝试在最常见的情况下使缓存条目无效:
队列被删除时
交换被删除时
绑定被删除时。
消费者在自动删除的队列上被取消时。
从自动删除的交换中取消绑定队列或交换机。
但是,客户端无法跟踪单个连接之外的这些拓扑更改。依赖自动删除队列或交换的应用程序,以及队列TTL(注意:不是消息TTL!),并使用自动连接恢复,应明确删除知道未被使用或删除的实体,以清除客户端拓扑缓存。这通过Channel#queueDelete, Channel#exchangeDelete,Channel#queueUnbind和Channel# exchangeUnbind在RabbitMQ
3.3.x中是幂等的(删除什么不会导致异常)。
作为编程方便,Java客户端API提供了一个类RpcClient,它使用临时应答队列通过AMQP 0-9-1 提供简单的RPC风格的通信设施。
该类不会对RPC参数和返回值强加任何特定的格式。它简单地提供了一种用于使用特定路由密钥向给定交换机发送消息并等待响应队列上的响应的机制。
(这个类使用AMQP 0-9-1的实现细节如下:请求消息被发送,并将 basic.correlation_id字段设置为该RpcClient实例唯一的值,并将basic.reply_to设置为回复队列。)
创建此类的实例后,可以使用以下任何方法来发送RPC请求:
该primitiveCall方法传送原始字节数组作为请求和响应机构。方法stringCall是一个围绕primitiveCall的简便的包装器,将消息体视为默认字符编码中的String实例。
该mapCall变种是有点更复杂的:它们编码java.util.Map包含普通的Java值到AMQP
0-9-1二进制表表示,和解码以同样的方式回应。(请注意,在这里可以使用什么值类型有一些限制 - 有关详细信息,请参阅javadoc。)
所有编组/解组的方便方法都使用primitiveCall作为传输机制,并在其顶部提供一个包装层
概述
RabbitMQ Java客户端使用com.rabbitmq.client作为其顶级包。关键类和接口是:渠道
连接
ConnectionFactory
消费者
通过Channel界面提供协议 操作。连接用于打开通道,注册连接生命周期事件处理程序,并关闭不再需要的连接。 连接通过ConnectionFactory实例化,它是如何配置各种连接设置,如vhost或用户名。
连接和通道
核心API类是连接 和通道,分别表示AMQP0-9-1连接和通道。它们通常在使用前进口:
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
连接到经纪人
以下代码使用给定的参数(主机名,端口号等)连接到AMQP代理:ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); 连接conn = factory.newConnection();
所有这些参数对于本地运行的RabbitMQ服务器具有明智的默认值。
或者,可以使用URI:
ConnectionFactory factory = new ConnectionFactory(); factory.setUri(“amqp:// userName:password @ hostName:portNumber / virtualHost”); 连接conn = factory.newConnection();
所有这些参数对于在本地运行的库存RabbitMQ服务器具有明智的默认值。
的连接然后接口可以用于打开一个通道:
Channel channel = conn.createChannel();
现在可以使用该通道来发送和接收消息,如后续部分所述。
要断开连接,只需关闭通道和连接:
channel.close(); conn.close();
请注意,关闭频道可能被认为是良好的做法,但这并不是绝对必要的 - 当底层连接关闭时,它将自动完成。
使用交换和队列
客户端应用程序与交换和队列(AMQP的高级构建块)一起工作。这些必须在被声明之前才能被使用。声明任何类型的对象只是确保其中一个名称存在,如有必要,创建它。继续前面的例子,以下代码声明一个交换和一个队列,然后将它们绑定在一起。
channel.exchangeDeclare(exchangeName,“direct”,true); String queueName = channel.queueDeclare()。getQueue(); channel.queueBind(queueName,exchangeName,routingKey);
这将主动声明以下对象,这两个对象可以通过使用其他参数进行定制。这两个都没有任何特殊的论据。
持久,非自动删除的“直接”类型的交换
具有生成名称的非持久,独占,自动删除队列
上述函数调用然后使用给定的路由密钥将队列绑定到交换机。
请注意,当只有一个客户端想要使用它时,这将是一种典型的方式来声明一个队列:它不需要一个众所周知的名称,没有其他客户端可以使用它(排他),并将自动清除(autodelete )。如果几个客户端想要与一个众所周知的名称共享一个队列,这个代码是适当的:
channel.exchangeDeclare(exchangeName,“direct”,true); channel.queueDeclare(queueName,true,false,false,null); channel.queueBind(queueName,exchangeName,routingKey);
这将积极宣布:
持久,非自动删除的“直接”类型的交换
一个持久,非排他性,非自动删除队列与着名的名字
请注意,所有这些Channel API方法都是重载的。这些方便的简单形式的exchangeDeclare,queueDeclare和queueBind 使用合理的默认值。还有更多的表单具有更多的参数,可以根据需要覆盖这些默认值,并在需要时进行完全控制。
在客户端API使用中使用这种“简短形式,长形式”模式。
发布消息
要将消息发布到交换机,请按如下所示使用Channel.basicPublish:byte [] messageBodyBytes = “你好,世界!” .getBytes(); channel.basicPublish(exchangeName,routingKey,null,messageBodyBytes);
为了精细控制,您可以使用重载变量来指定必需标志,或者使用预设消息属性发送消息:
channel.basicPublish(exchangeName,routingKey,mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
这将发送一个带有传递模式2(持久),优先级1和内容类型“文本/普通”的消息。您可以构建自己的消息属性对象,使用Builder类提及您喜欢的许多属性,例如:
channel.basicPublish(exchangeName,routingKey, new AMQP.BasicProperties.Builder() .contentType(“text / plain”) .deliveryMode(2) 优先(1) .userId(“bob”) 。建立()), messageBodyBytes);
此示例发布带有自定义标头的消息:
Map <String,Object> headers = new HashMap <String,Object>(); headers.put(“latitude”, 51.5252949); headers.put(“longitude”, - 0.0905493); channel.basicPublish(exchangeName,routingKey, 新的 AMQP.BasicProperties.Builder() 头(头) 。建立()), messageBodyBytes);
此示例发布具有到期的消息:
channel.basicPublish(exchangeName,routingKey, new AMQP.BasicProperties.Builder() 过期(“60000”) 。建立()), messageBodyBytes);
我们没有在这里展示所有的可能性。
请注意,BasicProperties是自动生成的持有人类AMQP的内部类。
的调用通道#basicPublish最终会阻塞如果一个 资源驱动的报警生效。
通道和并发注意事项(线程安全)
线程之间不能共享通道实例。应用程序应该优先使用每个线程的通道,而不是跨多个线程共享相同的通道。虽然通道上的一些操作可以安全地同时调用,但有些操作不会导致线路上错误的帧交错。线程之间的共享通道也会干扰* 发布者确认。
通过订阅接收消息
import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;
接收消息的最有效的方法是使用Consumer 界面设置订阅。然后,消息将在其到达时自动传递,而不必被明确请求。
在调用与Consumer相关的API方法 时,个人订阅总是由其消费者标签引用,它们可以是客户端或服务器生成的,如AMQP规范文档中所述。不同的消费者对相同的S 通道必须具有不同的消费标签。
实现Consumer的最简单方法是将便利类DefaultConsumer子类化。此子类的对象可以在basicConsume 调用中传递以设置订阅:
boolean autoAck = false ; channel.basicConsume(queueName,autoAck,“myConsumerTag”, new DefaultConsumer(channel){@ Override public void handleDelivery (String consumerTag, 信封信封, AMQP.BasicProperties属性, byte [] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); long deliveryTag = envelope.getDeliveryTag(); //(处理消息组件在这里...) channel.basicAck(deliveryTag,false); } });
在这里,因为我们指定AUTOACK =
假,就必须承认交付给消息消费者,最容易进行handleDelivery 方法,如图所示。
更复杂的消费者将需要覆盖更多的方法。特别是,handleShutdownSignal 当通道和连接关闭被调用,handleConsumeOk传递消费者标签的任何其他回调到之前消费者被调用。
消费者还可以实现 handleCancelOk和handleCancel 方法,以分别通知显式和隐式取消。
你可以明确地取消特定的消费者与 Channel.basicCancel:
channel.basicCancel(consumerTag);
传递消费者标签。
对消费者的回调在与由Connection管理的线程分开的线程上 调度。这意味着 Consumer可以安全地在Connection或Channel上调用阻塞方法 ,例如 queueDeclare,txCommit, basicCancel或basicPublish。
每个频道都有自己的发送线程。对于每个 渠道一个消费者最常用的用例,这意味着消费者不会阻止其他消费者。如果你有多个 消费者每人S 通道意识到,一个长期运行的消费者可能会回调派遣容纳其他消费者 S于该频道。
检索单个消息
要显式检索邮件,请使用 Channel.basicGet。返回的值是GetResponse的一个实例,从中可以提取头信息(属性)和消息体:boolean autoAck = false ; getResponse response = channel.basicGet(queueName,autoAck); if(response == null){ //没有消息检索。 } else { AMQP.BasicProperties props = response.getProps(); byte [] body = response.getBody(); long deliveryTag = response.getEnvelope()。getDeliveryTag(); ...
并且由于AUTOACK = 假以上,则必须也呼吁Channel.basicAck确认您已成功收到的消息:
... channel.basicAck(method.deliveryTag,false); //确认收到消息 }
处理不可路由的消息
如果发布了一条消息,其中设置了“强制”标志,但不能路由,则代理将返回给发送客户端(通过AMQP.Basic.Return 命令)。要通知此类返回,客户端可以实现ReturnListener 接口并调用Channel.setReturnListener。如果客户端尚未为特定通道配置返回侦听器,则相关的返回消息将被静默地删除。
channel.setReturnListener(new ReturnListener(){ public void handleBasicReturn (int replyCode, 字符串replyText, 字符串交换, String routingKey, AMQP.BasicProperties属性, byte [] body) throws IOException{ ... } });
将会调用一个返回侦听器,例如,如果客户端发布一条消息,其中“强制”标志被设置为没有绑定到队列的“直接”类型的交换。
关机协议
AMQP客户端关机概述
AMQP 0-9-1连接和通道共享与管理网络故障,内部故障和显式本地关闭相同的通用方法。AMQP 0-9-1连接和通道具有以下生命周期状态:
打开:对象准备使用
关闭:对象已被明确通知关闭本地,已向任何支持的下层对象发出关闭请求,并正在等待其关闭过程完成
关闭:对象已经从任何下层对象接收到所有关闭完成通知,因此已关闭自身
这些对象总是处于关闭状态,不管导致关闭的原因,如应用程序请求,内部客户端库故障,远程网络请求或网络故障。
AMQP连接和通道对象具有以下与shutdown相关的方法:
addShutdownListener(ShutdownListener
listener)和removeShutdownListener(ShutdownListener listener)来管理任何侦听器,当对象转换到关闭状态时,它将被触发 。请注意,将ShutdownListener添加到已经关闭的对象将立即触发侦听器
getCloseReason(),以便调查对象关闭的原因
isOpen(),用于测试对象是否处于打开状态
close(int closeCode,String closeMessage),显式通知对象关闭
听众的简单使用将如下所示:
import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.ShutdownListener; connection.addShutdownListener(new ShutdownListener(){ public void shutdownCompleted (ShutdownSignalException cause) { ... } });
关于关闭情况的信息
人们可以检索 ShutdownSignalException或者通过显式调用,它包含了所有可用的关于关闭原因的信息,getCloseReason() 方法或使用事业在参数服务(ShutdownSignalException原因) 的方法ShutdownListener类。该ShutdownSignalException类提供方法来分析关机的原因。通过调用isHardError()方法中,我们得到的信息是否是连接或信道错误,并getReason()返回的信息有关的原因,在形式的AMQP方法-无论是 AMQP.Channel.Close或 AMQP.Connection.Close(或者如果原因是库中的一些异常,例如网络通信失败,在这种情况下可以使用getCause()检索异常),则返回null 。
public void shutdownCompleted (ShutdownSignalException cause) { if(cause.isHardError()) { 连接conn =(Connection)cause.getReference(); if(!cause.isInitiatedByApplication()) { 方法原因= cause.getReason(); ... } ... } else { channel ch =(Channel)cause.getReference(); ... } }
原子性和使用isOpen()方法
对于生产代码,不推荐使用通道和连接对象 的isOpen()方法,因为方法返回的值取决于是否存在关闭原因。以下代码说明了竞赛条件的可能性:public void brokenMethod (Channel channel) { if(channel.isOpen()) { //以下代码取决于通道处于打开状态。 //但是有可能 在isOpen()和basicQos(1)调用之间改变通道状态// ... channel.basicQos(1); } }
相反,我们通常应该忽略这种检查,只需尝试所需的操作。如果代码的执行期间的连接的通道被关闭,一个ShutdownSignalException将被抛出,指示对象处于无效状态。我们还应该捕获由SocketException 导致的IOException,当代理程序意外关闭连接时,或者ShutdownSignalException,当代理启动clean时关闭。
public void validMethod (Channel channel) { try { ... channel.basicQos(1); } catch(ShutdownSignalException sse){ //可能检查通道是否关闭 //当我们开始动作和 //关闭它的原因 ... } catch(IOException ioe){ //检查连接是否关闭 ... } }
高级连接选项
消费线程池
消费者线程(见接收下文)在一个新的自动分配ExecutorService的默认线程池。如果需要更大的控制提供一个ExecutorService的上 newConnection()方法,使得该池中的线程来代替。以下是一个示例,其中提供了比正常分配的更大的线程池:ExecutorService es = Executors.newFixedThreadPool(20); 连接conn = factory.newConnection(es);
无论执行人及的ExecutorService类中的java.util.concurrent包。
当连接被关闭的默认的ExecutorService 将关闭()
,但一个用户提供 的ExecutorService(如ES上文)将 不被关断()
。提供自定义ExecutorService的客户端必须确保最终关闭(通过调用其shutdown()方法),否则池的线程可能会阻止JVM终止。
相同的执行器服务可以在多个连接之间共享,或者在重新连接时重新使用,但在shutdown()之后不能使用 。
只有在有证据表明消费者 回访处理存在严重瓶颈时,才应考虑使用此功能。如果没有执行消费者回调,或者很少,默认分配就足够了。开销最初是最小的,并且分配的线程资源总量是有界的,即使有时也会发生消费者活动的爆发。
使用主机列表
可以将Address数组传递给newConnection()。一个地址只是一个方便类,在com.rabbitmq.client包中有主机 和端口组件。例如:地址[] addrArr = new Address [] { new Address(hostname1,portnumber1) ,new Address(hostname2,portnumber2)}; 连接conn = factory.newConnection(addrArr);
将尝试连接到hostname1:portnumber1,并且如果hostname2不成功:portnumber2。返回的连接是数组中第一个成功(不抛出 IOException)。这完全等同于在工厂中重复设置主机和端口,每次调用factory.newConnection(),直到其中一个成功。
如果提供ExecutorService(使用表单factory.newConnection(es,addrArr)),线程池与(第一个)成功连接相关联。
如果您希望更多地控制主机连接,请参阅 对服务发现 的支持。
使用AddressResolver接口进行服务发现
随着3.6.6版本,有可能让一个实施 AddressResolver选择在哪里建立连接时连接:连接conn = factory.newConnection(addressResolver);
该AddressResolver界面是这样的:
公共 接口 AddressResolver { 列表<Address> getAddresses () throws IOException ; }
就像使用主机列表一样,首先尝试返回的第一个地址,然后如果客户端无法连接到第一个地址,则等待第二个。
如果还提供ExecutorService(使用表单factory.newConnection(es,addressResolver)),则线程池与(第一个)成功连接相关联。
该AddressResolver是实现定制服务发现逻辑,这是一个动态的基础设施特别有用的理想场所。结合自动恢复,客户端可以自动连接到首次启动时未平衡的节点。亲和度和负载平衡是其他一些自定义AddressResolver可能有用的场景。
Java客户端附带以下实现(有关详细信息,请参阅javadoc):
DnsRecordIpAddressResolver:给出主机的名称,返回其IP地址(针对平台DNS服务器的分辨率)。这对于简单的基于DNS的负载平衡或故障切换很有用。
DnsSrvRecordAddressResolver:给定服务的名称,返回主机名/端口对。搜索被实现为DNS
SRV请求。当使用像HashiCorp Consul这样的服务注册表时,这可能很有用 。
心跳超时
有关心跳的详细信息以及如何在Java客户机中进行配置,请参阅心跳指南。定制线程工厂
Google App Engine(GAE)等环境可以限制直接线程实例化。要在这样的环境中使用RabbitMQ Java客户端,需要配置一个使用适当方法实例化线程的自定义ThreadFactory,例如GAE的ThreadManager。以下是GoogleApp Engine的示例。
导入 com.google.appengine.api.ThreadManager; ConnectionFactory cf = new ConnectionFactory(); cf.setThreadFactory(ThreadManager.backgroundThreadFactory());
支持Java非阻塞IO
Java客户端版本4.0 为Java非阻塞IO(又名Java NIO)提供实验支持。NIO不应该比阻止IO更快,它只是允许更容易地控制资源(在这种情况下,线程)。使用默认阻止IO模式,每个连接使用线程从网络套接字读取。使用NIO模式,您可以控制从/到网络套接字读取和写入的线程数。
如果您的Java进程使用许多连接(数十或数百),请使用NIO模式。您应该使用比默认阻止模式更少的线程。使用适当数量的线程设置,您不应该尝试任何性能下降,特别是如果连接不那么忙。
NIO必须明确启用:
ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.useNio();
NIO模式可以通过NioParams类进行配置:
connectionFactory.setNioParams(new NioParams()。setNbIoThreads(4));
NIO模式使用合理的默认值,但您可能需要根据自己的工作负载进行更改。一些设置是:使用的IO线程总数,缓冲区大小,IO循环中使用的服务执行程序,内存中写入队列的参数(写入请求在网络发送之前排入队列)。请阅读Javadoc了解详细信息和默认值。
网络故障自动恢复
连接恢复
客户端和RabbitMQ节点之间的网络连接可能会失败。RabbitMQ Java客户端支持自动恢复连接和拓扑(队列,交换,绑定和消费者)。许多应用程序的自动恢复过程遵循以下步骤:重新连接
恢复连接侦听器
重新开放频道
恢复频道听众
恢复频道basic.qos设置,发布商确认和交易设置
拓扑恢复包括对每个通道执行的以下操作
重新声明交换(预定义除外)
重新声明队列
恢复所有绑定
恢复所有消费者
从Java客户端4.0.0版开始,默认情况下启用自动恢复(因此也可以进行拓扑恢复)。
要禁用或启用自动连接恢复,请使用factory.setAutomaticRecoveryEnabled(boolean) 方法。以下代码段显示了如何明确启用自动恢复(例如,对于4.0.0以前的Java客户端):
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(true); 连接将自动恢复连接conn = factory.newConnection();
如果由于异常而导致恢复失败(例如,RabbitMQ节点仍无法访问),则会在固定的时间间隔(默认为5秒)后重试。可以配置间隔:
ConnectionFactory factory = new ConnectionFactory(); //尝试每10秒恢复一次 factory.setNetworkRecoveryInterval(10000);
当提供地址列表时,列表被洗牌,并尝试所有地址,一个接着一个:
ConnectionFactory factory = new ConnectionFactory(); 地址[] addresses = { new Address(“192.168.1.4”),new Address(“192.168.1.5”)}; factory.newConnection(addresses);
恢复侦听器
可以在可恢复的连接和通道上注册一个或多个恢复侦听器。当启用连接恢复时,ConnectionFactory #newConnection和Connection# createChannel返回的 连接 实现com.rabbitmq.client.Recoverable,提供两个具有相当描述性名称的方法:addRecoveryListener
removeRecoveryListener
请注意,您目前需要将连接和渠道转换为可恢复 才能使用这些方法。
对出版的影响
当连接关闭时 使用Channel.basicPublish发布的邮件将丢失。连接恢复后,客户端不会将其排入队列。为确保发布的消息到达RabbitMQ应用程序需要使用Publisher确认 并记录连接失败。拓扑恢复
拓扑恢复涉及交换,队列,绑定和消费者的恢复。默认情况下启用自动恢复启用。因此,默认情况下,从Java客户端4.0.0启用拓扑恢复。如果需要,可以明确地禁用拓扑恢复:
ConnectionFactory factory = new ConnectionFactory(); 连接conn = factory.newConnection(); //启用自动恢复(例如Java 4.0 以前的版本) factory.setAutomaticRecoveryEnabled(true); //禁用拓扑恢复 factory.setTopologyRecoveryEnabled(false);
手动鸣叫和自动恢复
当使用手动确认时,可能在消息传递和确认之间到RabbitMQ节点的网络连接失败。连接恢复后,RabbitMQ将重置所有通道上的传输标签。这意味着 使用旧的传递标签的basic.ack,basic.nack和basic.reject会导致通道异常。为了避免这种情况,RabbitMQ Java客户端跟踪和更新传递标签,使其在恢复之间单调增长。 Channel.basicAck, Channel.basicNack和 Channel.basicReject然后将调整后的传递标签转换为RabbitMQ使用的标签。 致谢与陈旧的交货标签将不会发送。使用手动确认和自动恢复的应用程序必须能够处理重新投递。
未处理的例外
与连接,通道,恢复和消费者生命周期相关的未处理的异常被委派给异常处理程序。异常处理程序是实现ExceptionHandler接口的任何对象 。默认情况下,使用DefaultExceptionHandler的实例。它将异常详细信息打印到标准输出。可以使用ConnectionFactory#setExceptionHandler来覆盖处理程序 。它将用于由工厂创建的所有连接:
ConnectionFactory factory = new ConnectionFactory(); cf.setExceptionHandler(customHandler);
异常处理程序应用于异常记录。
指标和监测
从版本4.0.0开始,客户端会收集运行时度量(例如发布的消息数)。Metrics集合是可选的,并使用setMetricsCollector(metricsCollector)方法在ConnectionFactory级别 进行设置。此方法需要一个MetricsCollector实例,该实例在客户端代码的几个位置调用。 客户端附带一个MetricsCollector使用实施Dropwizard指标 库。可以通过以下方式启用指标收集:
ConnectionFactory connectionFactory = new ConnectionFactory(); StandardMetricsCollector metrics = new StandardMetricsCollector(); connectionFactory.setMetricsCollector(metrics); ... metrics.getPublishedMessages(); //获取Metrics的Meter对象
以下是收集的指标:
打开的连接数( 默认实现中的计数器)
开放通道数( 默认实现中为计数器)
发布的消息数(一米的默认实现)
消费的消息数(一米的默认实现)
确认的消息数(一米的默认实现)
被拒绝的消息数(一米的默认实现)
通过使用Dropwizard Metrics,不仅可以计数,还可以使用通用工具(JMX,Graphite,Ganglia,HTTP)的平均速率,最后五分钟的速率等等。
请注意以下关于指标收集:
如果您使用基于Dropwizard Metrics的默认实现,请勿忘记将适当的JAR文件添加到类路径中(Dropwizard Metrics不会自动与Java客户端一起提取,因为它是可选依赖项)。
Metrics集合是可扩展的,没有任何阻止您根据特定需要实现自己的 MetricsCollector。
所述MetricsCollector设置在ConnectionFactory的 水平,但可以翻过不同实例共享。
指标集不支持交易。例如,如果在事务中发送确认,然后事务被回滚,确认将被计入客户端度量(而不是经纪人)。请注意,确认实际上是发送到代理,然后由事务回滚取消,因此客户端指标在发送确认方面是正确的。作为总结,不要使用客户指标进行敏感的业务检查,因为它们不能保证是完全准确的。
指标报告
如果您使用StandardMetricsCollector基于Dropwizard指标,您可以发送的指标来 几个报告后端:控制台,JMX,HTTP,石墨,神经节等。您通常会将MetricsRegistry的一个实例传递 给StandardMetricsCollector。以下是JMX的一个例子:
MetricRegistry registry = new MetricRegistry(); StandardMetricsCollector metrics = new StandardMetricsCollector(registry); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setMetricsCollector(metrics); JmxReporter记者= JmxReporter .forRegistry(注册表) .inDomain(“com.rabbitmq.client.jmx”) 。建立(); reporter.start();
Google App Engine上的RabbitMQ Java Client
在Google App Engine(GAE)上使用RabbitMQ Java客户端需要使用自定义线程工厂来实例化线程使用GAE的ThreadManager(见上文)。另外,有必要设置一个低心跳间隔(4-5秒),以避免在GAE 上进入低输入流读取超时:ConnectionFactory factory = new ConnectionFactory(); cf.setRequestedHeartbeat(5);
注意事项和限制
为了使拓扑恢复成为可能,RabbitMQ Java客户机维护一个声明的队列,交换和绑定的缓存。缓存是每个连接。某些RabbitMQ功能使得客户端不可能观察到一些拓扑变化,例如当由于TTL而删除队列时。RabbitMQ Java客户端尝试在最常见的情况下使缓存条目无效:队列被删除时
交换被删除时
绑定被删除时。
消费者在自动删除的队列上被取消时。
从自动删除的交换中取消绑定队列或交换机。
但是,客户端无法跟踪单个连接之外的这些拓扑更改。依赖自动删除队列或交换的应用程序,以及队列TTL(注意:不是消息TTL!),并使用自动连接恢复,应明确删除知道未被使用或删除的实体,以清除客户端拓扑缓存。这通过Channel#queueDelete, Channel#exchangeDelete,Channel#queueUnbind和Channel# exchangeUnbind在RabbitMQ
3.3.x中是幂等的(删除什么不会导致异常)。
RPC(请求/回复)模式
作为编程方便,Java客户端API提供了一个类RpcClient,它使用临时应答队列通过AMQP 0-9-1 提供简单的RPC风格的通信设施。该类不会对RPC参数和返回值强加任何特定的格式。它简单地提供了一种用于使用特定路由密钥向给定交换机发送消息并等待响应队列上的响应的机制。
import com.rabbitmq.client.RpcClient; RpcClient rpc = new RpcClient(channel,exchangeName,routingKey);
(这个类使用AMQP 0-9-1的实现细节如下:请求消息被发送,并将 basic.correlation_id字段设置为该RpcClient实例唯一的值,并将basic.reply_to设置为回复队列。)
创建此类的实例后,可以使用以下任何方法来发送RPC请求:
byte [] primitiveCall(byte [] message); String stringCall (String message) Map mapCall (Map message) Map mapCall (Object [] keyValuePairs)
该primitiveCall方法传送原始字节数组作为请求和响应机构。方法stringCall是一个围绕primitiveCall的简便的包装器,将消息体视为默认字符编码中的String实例。
该mapCall变种是有点更复杂的:它们编码java.util.Map包含普通的Java值到AMQP
0-9-1二进制表表示,和解码以同样的方式回应。(请注意,在这里可以使用什么值类型有一些限制 - 有关详细信息,请参阅javadoc。)
所有编组/解组的方便方法都使用primitiveCall作为传输机制,并在其顶部提供一个包装层
相关文章推荐
- RabbitMQ-Java客户端API指南-上
- RabbitMQ-Java客户端API指南-下
- RabbitMQ的Java客户端API指南
- RabbitMQ-Java客户端API指南-上
- Sqoop2 Java客户端API指南
- RabbitMQ-Java客户端API指南-上
- RabbitMQ-Java客户端API指南-下
- RabbitMQ Java客户端API指南
- RabbitMQ-Java客户端API指南-下
- Enterprise JavaBeans 组件和 CORBA 客户端:开发者指南
- Java8 日期/时间(Date Time)API指南
- JAVA客户端API调用memcached两种方式
- RDD JAVA API 用法指南
- zookeeper java客户端简单API
- Java API 设计指南
- RabbitMQ的java客户端API使用文档中文版
- Java8 日期/时间(Date Time)API指南
- Java 8 日期/时间(Date Time)API指南
- 【华为】Redis客户端API使用(java)
- Java读写文件API的用法指南,性能分析与对比。