您的位置:首页 > 编程语言 > Java开发

Rabbitmq Java Client Api详解

2014-11-13 13:27 579 查看


AMQP

AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现。


基础概念快速入门

每个rabbitmq-server叫做一个Broker,等着tcp连接进入。

在rabbitmq-server进程内有Exchange,定义了这个消息的发送类型。(一对多、直连、多对多等等)

Queue是进程内的逻辑队列,有多个,有名字。

Binding联系Exchane与Queue。

Routing key由生产者指定。Binding key由消费者指定。二者联合决定一条消息的来去。


java client api


连接

1
2
3
4

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();

以上是得到一个rabbitmq连接最最基础的代码,当然了,还可以设置一些诸如用户名密码的事情。

最后这个channel就可以用来收和发消息了。


消息者线程池

1
2

ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);

消费者时使用,上述自动开了一20个线程的池来搞。


地址数组

1
23

Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
, new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);

上述代码如果连hostname1失败了就去hostname2。

factory.newConnection()会触发这个检测。


声明exchange与queue

1
23

channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

channel.exchangeDeclare 参数有 交换机名字 类型 是否持久化 不使用时是否自动删除 是否是内部的(不能被客户端使用) 其他参数

channel.queueDeclare 参数有 queue名字 是否持久化 独占的queue(仅供此连接) 不使用时是否自动删除 其他参数

channel.queueBind 参数有 queue名字 交换机名字 此次绑定使用的路由关键字 其他参数


发出消息

1
2

byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

channel.basicPublish 参数有 要发出的交换机名字 路由关键字 是否强制(设置为true时,找不到收的人时可以通过returnListener返回) 是否立即(其实rabbitmq不支持) 其他属性 消息主体


线程安全

Channel是线程好全的,但是最好是每个线程里用自己的Channel,因为在单个Channel里排队是有可能慢一些的。


最简单的办法消费消息

1
2
3
45
6
7
8
9
10
11
12
13
14
15
16
17

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.contentType;
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});

一个Channel一个Consumer。

channel.basicAck 回发ACK 参数 tag 是否多个。


零碎

channel.basicQos 指定服务质量设置 参数 最大的投送字节数 最大的投送消息数量 设置是否要应用到整个channel(而不是一个消费者)。

factory.setAutomaticRecoveryEnabled(true) 网络有问题时,好后可自动恢复设置。

cf.setRequestedHeartbeat(5) 设置心跳时间。

exchange type可用的值:direct topic headers fanout。

exchange的类型有一个default,basicPublish没有指定时使用,而且,如果routingKey在指定绑定的时候,会去到绑定的exchange。

channel.queueDeclare().getQueue() 得到的是一个随机queue,断开连接后即删除。

当exchange为direct的时候routingKey与bindingKey必须完全一致才能消费消息。


AMQP模型




Producer

Producer即生产者,是一个向Exchange发布消息的客户端应用程序。

Producer创建Message,并将Message发布到Broker Server。Message由两部分组成:Payload和Label。Payload是被传输的数据,Label(包含Exchange name, Topic tag等等)是对Payload的描述,用来路由消息。


Consumer

Consumer即消费者,一个从Message Queue中请求消息的客户端应用程序

Consumer连接到Broker Server上,并订阅Broker Server上的Queue。Consumer接收到的Message只包含Payload,也就是说消息路由完成之后,消息中不再包含Label。


Channel

Channel即通道,多路复用连接中的一条独立的双向数据流通道,为会话提供物理传输介质,在客户端的每个连接里,可建立多个通道。

例如,Consumer要连接到Broker Server上,就需要创建一个TCP连接,当这个TCP连接打开后,Consumer会创建一个通道,通道是TCP连接中的虚拟连接。一个TCP连接中可以创建多个通道。


Queue

Queue即消息队列,是消息的容器,用来保存消息直到被Consumer接收,每个消息都会被投入到一个或多个队列中。


Exchange和Binding

Exchange即交换器,用来接收Producer发布的消息,并通过设定的路由规则将消息路由给服务器中的Queue。

Binding即绑定器,可以理解为路由规则。它的作用就是把Exchange和Queue按照路由规则绑定起来。

Binding Key即绑定关键字,Exchange视自身类型来决定Binding的路由行为。

Routing Key即消息的路由关键字,Exchange根据这个关键字决定如何路由某条消息。

Exchange共有四种类型:

Direct: 将消息中的Routing Key与该Exchange关联的所有Binding中的Binding Key进行比较,如果相等,则将消息发送到该Binding对应的Queue中。

Fanout: 这是一种广播行为,将消息发送给所有与该Exchange定义过Binding的所有Queue中。

Topic: 将Routing Key与Binding Key进行匹配,Routing Key和Binding Key由多个单词组成,这些单词使用
.
分隔,使用
*
#
进行模糊匹配,
*
匹配一个单词,
#
匹配0个或多个单词。如果匹配成功,将消息发送到对应的Queue中。如
sys.error
biz.error
都和
*.error
匹配。

Headers: 不依赖于Routing Key与Binding Key的匹配规则来路由消息,而是根据消息内容中的headers属性进行匹配。在绑定Exchange和Queue时指定一组键值对,当消息发送到Exchange时,会取得消息的headers,也是键值对的形式,将两组键值对进行匹配,如果完全匹配,则将消息发送到对应的Queue中。


Broker

Broker是消息队列服务器实体,交换器Exchange和消息队列Queue都位于服务器端,即为Broker。


Vhost

Vhost即virtual host虚拟主机,指一批相关的Exchange、Queue和Binding。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。一个Broker里可以开设多个vhost,用作不同用户的权限分离
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: