rabbitMQ 初识
2017-03-28 10:28
197 查看
需求:要闻推送
什么是rabbitmq?
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。
Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
具体的关于exchange以及exchange的三种模式:direct,topic,fanout。网上一搜一大推。
exchange下绑定queue。
spring与rabbitmq整合:(topic模式)
rabbitmq.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--连接工厂 -->
<rabbit:connection-factory id="connectionFactory" host="${rabbit.host}" username="${rabbit.username}" password="${rabbit.password}" port="${rabbit.port}" publisher-confirms="true" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="${rabbit.exchange}" routing-key="foo.bar" message-converter="jsonMessageConverter" confirm-callback="" />
<rabbit:admin connection-factory="connectionFactory" id="adminId" />
<!-- 配置exchange,不同的exchange会影响消息分发策略 -->
<!-- 消息对象json转换类 -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<!-- 申明一个消息队列Queue
说明: durable:是否持久化 exclusive: 仅创建者可以使用的私有队列,断开后自动删除 auto_delete: 当所有消费客户端连接断开后,是否自动删除队列
-->
<rabbit:queue id="${rabbit.queue.yaowen}" name="${rabbit.queue.yaowen}" durable="true" auto-delete="false" exclusive="false" />
<!-- 交换机定义
说明: rabbit:direct-exchange:定义exchange模式为topic。 rabbit:binding:设置消息queue匹配的key
Name
Durability (消息代理重启后,交换机是否还存在)
Auto-delete (当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它)
Arguments(依赖代理本身)
-->
<rabbit:topic-exchange name="${rabbit.exchange}" durable="true" auto-delete="false" id="${rabbit.exchange}">
<rabbit:bindings>
<rabbit:binding queue="${rabbit.queue.yaowen}" pattern="${rabbit.queue.yaowen}" />
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 监听器 -->
<!--
<bean id="queueListenter" class="com.kf.data.service.listener.QueueListenter"/>
<rabbit:listener-container connection-factory="connectionFactory" >
<rabbit:listener queues="${rabbit.queue.yaowen}" ref="queueListenter" />
</rabbit:listener-container>
-->
</beans>
applicationContext.xml
<websocket:message-broker
application-destination-prefix="/app">
<websocket:stomp-endpoint path="/kf-websocket"
allowed-origins="*">
<websocket:sockjs />
</websocket:stomp-endpoint>
<websocket:stomp-broker-relay prefix="/topic,/queue,/amq/queue,/exchange"
system-login="${rabbit.username}" system-passcode="${rabbit.password}"
client-login="${rabbit.username}" client-passcode="${rabbit.password}"
relay-host="${rabbit.host}" relay-port="${rabbit.port.stomp}" heartbeat-receive-interval="5000" heartbeat-send-interval="5000"/>
</websocket:message-broker>
websocket与rabbitmq同时使用的。
MQProducerImpl 这个类是用来将message send到对应的exchange下的queue中。
package com.kf.data.service.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.kf.data.crawler.def.LogConfig;
@Service
public class MQProducerImpl implements MQProducer {
@Autowired
private AmqpTemplate amqpTemplate;
@Value(value = "${rabbit.queue.yaowen}")
private String queuename;
@Value(value="${rabbit.exchange}")
private String exchangename;
private static final Logger log = LoggerFactory.getLogger(LogConfig.YX);
@Override
public void sendDataToQueue(String queueKey, Object object) {
try {
log.info(queueKey+"-----sendDataToQueue------"+object);
System.out.println("exchangename:"+exchangename);
System.out.println("queueKey:"+queueKey);
System.out.println("object:"+object);
amqpTemplate.convertAndSend(exchangename,queueKey, object);
} catch (Exception e) {
log.error(e.toString());
}
}
public void sendDataToTopic(Object object) {
log.info("-----sendDataToTopic------"+object);
try {
System.out.println("topic:"+queuename);
System.out.println("object:"+object);
amqpTemplate.convertAndSend(queuename, object);
} catch (Exception e) {
log.error(e.toString());
}
}
public void sendYaowenToQueue(Object object) {
log.info("-----sendYaowenToQueue------"+object);
try {
sendDataToQueue(queuename, object);
} catch (Exception e) {
log.error(e.toString());
}
}
public void sendYaowenToExchange(Object object) {
log.info("-----sendYaowenToExchange------"+object);
try {
System.out.println("exchangename:"+exchangename);
System.out.println("queuename:"+queuename);
System.out.println("object:"+object);
amqpTemplate.convertAndSend(exchangename,queuename, object);
} catch (Exception e) {
log.error(e.toString());
}
}
/**
* 点对点
*/
public void sendDataToCrQueue(Object obj) {
amqpTemplate.convertAndSend("queue_one_key", obj);
}
/**
* 发送 发布--订阅消息
*/
public void sendFanoutMsg(Object obj) {
amqpTemplate.convertAndSend(obj);
}
/**
* 主题
*/
public void sendTopicMsg(String topic,Object obj) {
amqpTemplate.convertAndSend(topic,obj);
}
}
PS:
如果有多个消费者,那么queue会分发。就是其中一个消费者消费完这条message后,会从queue中移除,导致其他的消费者消费不到当条信息。
如果要实现所有的消费者能够消费相同的queue中的message的话,就要指定exchange为:amq.topic(rabbitmq自带的)
打开rabbitmq的web管理页面在exchange中会看到这个exchange。
什么是rabbitmq?
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。
Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
具体的关于exchange以及exchange的三种模式:direct,topic,fanout。网上一搜一大推。
exchange下绑定queue。
spring与rabbitmq整合:(topic模式)
rabbitmq.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--连接工厂 -->
<rabbit:connection-factory id="connectionFactory" host="${rabbit.host}" username="${rabbit.username}" password="${rabbit.password}" port="${rabbit.port}" publisher-confirms="true" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="${rabbit.exchange}" routing-key="foo.bar" message-converter="jsonMessageConverter" confirm-callback="" />
<rabbit:admin connection-factory="connectionFactory" id="adminId" />
<!-- 配置exchange,不同的exchange会影响消息分发策略 -->
<!-- 消息对象json转换类 -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<!-- 申明一个消息队列Queue
说明: durable:是否持久化 exclusive: 仅创建者可以使用的私有队列,断开后自动删除 auto_delete: 当所有消费客户端连接断开后,是否自动删除队列
-->
<rabbit:queue id="${rabbit.queue.yaowen}" name="${rabbit.queue.yaowen}" durable="true" auto-delete="false" exclusive="false" />
<!-- 交换机定义
说明: rabbit:direct-exchange:定义exchange模式为topic。 rabbit:binding:设置消息queue匹配的key
Name
Durability (消息代理重启后,交换机是否还存在)
Auto-delete (当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它)
Arguments(依赖代理本身)
-->
<rabbit:topic-exchange name="${rabbit.exchange}" durable="true" auto-delete="false" id="${rabbit.exchange}">
<rabbit:bindings>
<rabbit:binding queue="${rabbit.queue.yaowen}" pattern="${rabbit.queue.yaowen}" />
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 监听器 -->
<!--
<bean id="queueListenter" class="com.kf.data.service.listener.QueueListenter"/>
<rabbit:listener-container connection-factory="connectionFactory" >
<rabbit:listener queues="${rabbit.queue.yaowen}" ref="queueListenter" />
</rabbit:listener-container>
-->
</beans>
applicationContext.xml
<websocket:message-broker
application-destination-prefix="/app">
<websocket:stomp-endpoint path="/kf-websocket"
allowed-origins="*">
<websocket:sockjs />
</websocket:stomp-endpoint>
<websocket:stomp-broker-relay prefix="/topic,/queue,/amq/queue,/exchange"
system-login="${rabbit.username}" system-passcode="${rabbit.password}"
client-login="${rabbit.username}" client-passcode="${rabbit.password}"
relay-host="${rabbit.host}" relay-port="${rabbit.port.stomp}" heartbeat-receive-interval="5000" heartbeat-send-interval="5000"/>
</websocket:message-broker>
websocket与rabbitmq同时使用的。
MQProducerImpl 这个类是用来将message send到对应的exchange下的queue中。
package com.kf.data.service.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.kf.data.crawler.def.LogConfig;
@Service
public class MQProducerImpl implements MQProducer {
@Autowired
private AmqpTemplate amqpTemplate;
@Value(value = "${rabbit.queue.yaowen}")
private String queuename;
@Value(value="${rabbit.exchange}")
private String exchangename;
private static final Logger log = LoggerFactory.getLogger(LogConfig.YX);
@Override
public void sendDataToQueue(String queueKey, Object object) {
try {
log.info(queueKey+"-----sendDataToQueue------"+object);
System.out.println("exchangename:"+exchangename);
System.out.println("queueKey:"+queueKey);
System.out.println("object:"+object);
amqpTemplate.convertAndSend(exchangename,queueKey, object);
} catch (Exception e) {
log.error(e.toString());
}
}
public void sendDataToTopic(Object object) {
log.info("-----sendDataToTopic------"+object);
try {
System.out.println("topic:"+queuename);
System.out.println("object:"+object);
amqpTemplate.convertAndSend(queuename, object);
} catch (Exception e) {
log.error(e.toString());
}
}
public void sendYaowenToQueue(Object object) {
log.info("-----sendYaowenToQueue------"+object);
try {
sendDataToQueue(queuename, object);
} catch (Exception e) {
log.error(e.toString());
}
}
public void sendYaowenToExchange(Object object) {
log.info("-----sendYaowenToExchange------"+object);
try {
System.out.println("exchangename:"+exchangename);
System.out.println("queuename:"+queuename);
System.out.println("object:"+object);
amqpTemplate.convertAndSend(exchangename,queuename, object);
} catch (Exception e) {
log.error(e.toString());
}
}
/**
* 点对点
*/
public void sendDataToCrQueue(Object obj) {
amqpTemplate.convertAndSend("queue_one_key", obj);
}
/**
* 发送 发布--订阅消息
*/
public void sendFanoutMsg(Object obj) {
amqpTemplate.convertAndSend(obj);
}
/**
* 主题
*/
public void sendTopicMsg(String topic,Object obj) {
amqpTemplate.convertAndSend(topic,obj);
}
}
PS:
如果有多个消费者,那么queue会分发。就是其中一个消费者消费完这条message后,会从queue中移除,导致其他的消费者消费不到当条信息。
如果要实现所有的消费者能够消费相同的queue中的message的话,就要指定exchange为:amq.topic(rabbitmq自带的)
打开rabbitmq的web管理页面在exchange中会看到这个exchange。
相关文章推荐
- 工作心得(三)——初识RabbitMQ,RabbitMQ的安装和集群配置
- RabbitMQ学习之:(一)初识、概念及心得
- .Net下RabbitMQ的使用(1) -- 初识RabbitMQ
- 初识RabbitMQ系列之二:下载安装
- RabbitMQ学习之:(一)初识、概念及心得
- RabbitMQ学习之:(一)初识、概念及心得
- 初识RabbitMQ-安装Win Rabbit遇到的问题
- 初识rabbitmq
- RabbitMQ学习之:(一)初识、概念及心得
- RabbitMQ学习之:(一)初识、概念及心得
- 初识RabbitMQ系列之三:.net 如何使用RabbitMQ
- RabbitMQ学习之:(一)初识、概念及心得
- spring boot实战(第十一篇)初识RabbitMQ
- RabbitMQ 初识
- .Net下RabbitMQ的使用(1) -- 初识RabbitMQ
- 初识RabbitMQ,附RabbitMQ+PHP演示实例
- 初识RabbitMQ,附RabbitMQ+PHP演示实例
- RabbitMQ系列一:初识RabbitMQ
- spring boot实战(第十一篇)初识RabbitMQ
- RabbitMQ第一篇——初识RabbitMQ,简单的消息发送和接收