您的位置:首页 > 其它

关于RabbitMQ的一些问题

2017-07-24 21:20 411 查看

一、三种exhange用法有何区别?

1.Direct Exchange

[b]消费者[/b]

配置文件清单comsumer-context.xml,监听directQueue队列消息(简化配置,关于spring-rabbitmq的配置可以参考笔者之前的这篇博客)

<!--配置队列名-->
<rabbit:queue name="directQueue"/>

<!--配置监听-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="fooMessageListener" queue-names="directQueue" />
</rabbit:listener-container>


监听处理程序

package amqp;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

@Component
public class FooMessageListener implements MessageListener {

@Override
public void onMessage(Message message) {
String messageBody = new String(message.getBody());
System.out.println(" [x] Received '" + messageBody + "'");
}
}


启动服务

public static void main(final String... args) throws Exception {
AbstractApplicationContext ctx =
new ClassPathXmlApplicationContext("comsumer-context.xml");
}


[b]生产者[/b]

配置文件清单producer-context.xml

<!--配置RabbitTemplate-->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="directExchange" routing-key="direct-key"/>

<rabbit:queue name="directQueue"/>

<!--配置direct类型exchange-->
<rabbit:direct-exchange name="directExchange">
<rabbit:bindings>
<rabbit:binding queue="directQueue" key="direct-key"/>
</rabbit:bindings>
</rabbit:direct-exchange>


发送消息

public static void main(final String... args) throws Exception {
AbstractApplicationContext ctx =
new ClassPathXmlApplicationContext("producer-context.xml");
RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
template.convertAndSend("Hello, world!");
Thread.sleep(1000);
ctx.destroy();
}


先启动消费者,再运行生产者发送消息,消费者可以接收到消息

[b]小结[/b]

任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue,其实就是点对点的方式

Direct Exchange绑定队列时,如果不指定key,则key默认为队列名

Direct Exchange绑定不是必须的,如下配置,此时可以简单将RouteKey理解为队列名

<!--配置RabbitTemplate-->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" routing-key="directQueue"/>


如果没有匹配的RouteKey,消息将被丢弃

Fanout Exchange

[b]消费者[/b]

配置文件清单comsumer-context.xml,同一个处理程序监听三个队列消息

<rabbit:queue name="fanoutQueue1"/>
<rabbit:queue name="fanoutQueue2"/>
<rabbit:queue name="fanoutQueue3"/>

<!--配置监听-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="fooMessageListener" queue-names="fanoutQueue1" />
<rabbit:listener ref="fooMessageListener" queue-names="fanoutQueue2" />
<rabbit:listener ref="fooMessageListener" queue-names="fanoutQueue3" />
</rabbit:listener-container>


[b]生产者[/b]

配置文件清单producer-context.xml

<!--配置RabbitTemplate-->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange"/>

<rabbit:queue name="fanoutQueue1"/>
<rabbit:queue name="fanoutQueue2"/>
<rabbit:queue name="fanoutQueue3"/>

<!--配置fanout类型exchange-->
<rabbit:fanout-exchange name="fanoutExchange">
<rabbit:bindings>
<rabbit:binding queue="fanoutQueue1"/>
<rabbit:binding queue="fanoutQueue2"/>
<rabbit:binding queue="fanoutQueue3"/>
</rabbit:bindings>
</rabbit:fanout-exchange>


监听程序会接收三条消息

[b]小结[/b]

任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定的所有队列上

Fanout Exchange不需要RouteKey,其实就是广播的方式

Fanout Exchange可以绑定一个或多个队列,如果没绑定,发送到Fanout Exchange上的消息将被丢弃

Topic Exchange

[b]消费者[/b]

配置文件清单comsumer-context.xml

<rabbit:queue name="topicQueue"/>

<!--配置监听-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="fooMessageListener" queue-names="topicQueue" />
</rabbit:listener-container>


[b]生产者[/b]

配置文件清单producer-context.xml,Topic Exchange绑定其关心主题的队列

<!--配置RabbitTemplate-->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="topicExchange" routing-key="foo.bar"/>

<rabbit:queue name="topicQueue"/>

<!--配置topic类型exchange-->
<rabbit:topic-exchange name="topicExchange">
<rabbit:bindings>
<rabbit:binding queue="topicQueue" pattern="foo.*"/>
</rabbit:bindings>
</rabbit:topic-exchange>


[b]小结[/b]

任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上

“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。

pattern不是正则,原因正则比较复杂,解析需要时间,实际应用也不需要定义多复杂的匹配规则

二、如何发送和接收消息?

消息由生产者往Exchange中发送,然后由Exchange转发到队列上,消费者监听Queue,消息过来便接收处理。

三、Exchange和Queue何时绑定?

问题一中的例子,绑定关系都是在发送端处理的,其实在接收端也是可以的,发送端只管发送消息到Exchange,而不用关心Exchange会将消息转发到哪个队列。哪个队列需要,就将哪个队列与Exchange绑定就行

四、一个队列有多个消费者时,消息会被多个消费者消费?

不会,消息会循环分发给其中一个消费者,要不然分布式环境下根本无法使用RabbitMQ了

五、消费者程序异常如何处理?

RabbitMQ为了保证消费者接收正确、完整的数据,引入消息确认机制。当消息被正确接收了,会向RabbitMQ Server发送ack确认,然后消息才会从队列中删除。以spring-rabbitmq为例,消费者处理异常,如果未对异常进行捕获处理,消费者会发生reject命令,拒绝消息,将消息重新入队,重复消费
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: