您的位置:首页 > 其它

rabbitMQ 初识

2017-03-28 10:28 197 查看
需求:要闻推送

什么是rabbitmq?

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。
Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

具体的关于exchange以及exchange的三种模式:direct,topic,fanout。网上一搜一大推。

exchange下绑定queue。

spring与rabbitmq整合:(topic模式)

rabbitmq.xml:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xmlns:rabbit="http://www.springframework.org/schema/rabbit"

       xsi:schemaLocation="http://www.springframework.org/schema/rabbit

           http://www.springframework.org/schema/rabbit/spring-rabbit.xsd

           http://www.springframework.org/schema/beans

           http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--连接工厂 -->
<rabbit:connection-factory id="connectionFactory"  host="${rabbit.host}" username="${rabbit.username}" password="${rabbit.password}" port="${rabbit.port}" publisher-confirms="true" />
 
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="${rabbit.exchange}" routing-key="foo.bar" message-converter="jsonMessageConverter" confirm-callback="" />
<rabbit:admin connection-factory="connectionFactory" id="adminId" />
<!-- 配置exchange,不同的exchange会影响消息分发策略 -->
<!-- 消息对象json转换类 -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<!-- 申明一个消息队列Queue 
说明: durable:是否持久化  exclusive: 仅创建者可以使用的私有队列,断开后自动删除     auto_delete: 当所有消费客户端连接断开后,是否自动删除队列
-->
<rabbit:queue id="${rabbit.queue.yaowen}" name="${rabbit.queue.yaowen}" durable="true" auto-delete="false" exclusive="false" />

<!-- 交换机定义 
说明:  rabbit:direct-exchange:定义exchange模式为topic。 rabbit:binding:设置消息queue匹配的key
Name
Durability (消息代理重启后,交换机是否还存在)
Auto-delete (当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它) 
Arguments(依赖代理本身)
-->
<rabbit:topic-exchange  name="${rabbit.exchange}" durable="true" auto-delete="false" id="${rabbit.exchange}">
   <rabbit:bindings>
       <rabbit:binding queue="${rabbit.queue.yaowen}" pattern="${rabbit.queue.yaowen}" />
   </rabbit:bindings>
</rabbit:topic-exchange>

<!-- 监听器 -->
<!-- 
<bean id="queueListenter" class="com.kf.data.service.listener.QueueListenter"/>

<rabbit:listener-container  connection-factory="connectionFactory" >
   <rabbit:listener queues="${rabbit.queue.yaowen}" ref="queueListenter" />
</rabbit:listener-container>
-->

</beans>

applicationContext.xml

<websocket:message-broker
application-destination-prefix="/app">
<websocket:stomp-endpoint path="/kf-websocket"
allowed-origins="*">
<websocket:sockjs />
</websocket:stomp-endpoint>
<websocket:stomp-broker-relay prefix="/topic,/queue,/amq/queue,/exchange"
system-login="${rabbit.username}" system-passcode="${rabbit.password}"
client-login="${rabbit.username}" client-passcode="${rabbit.password}"
relay-host="${rabbit.host}" relay-port="${rabbit.port.stomp}" heartbeat-receive-interval="5000" heartbeat-send-interval="5000"/>
</websocket:message-broker>

websocket与rabbitmq同时使用的。

MQProducerImpl 这个类是用来将message  send到对应的exchange下的queue中。

package com.kf.data.service.mq;

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Service;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.kf.data.crawler.def.LogConfig;

@Service

public class MQProducerImpl implements MQProducer {

    @Autowired

    private AmqpTemplate amqpTemplate;

    

    @Value(value = "${rabbit.queue.yaowen}")
private String queuename;

    

    @Value(value="${rabbit.exchange}")

    private String exchangename;

    private static final Logger log = LoggerFactory.getLogger(LogConfig.YX);

    

    @Override

    public void sendDataToQueue(String queueKey, Object object) {

        try { 

        log.info(queueKey+"-----sendDataToQueue------"+object); 

        System.out.println("exchangename:"+exchangename);

        System.out.println("queueKey:"+queueKey);

        System.out.println("object:"+object);

            amqpTemplate.convertAndSend(exchangename,queueKey, object); 

        } catch (Exception e) {

        log.error(e.toString());

        }

    }

    

    

    public void sendDataToTopic(Object object) {

    log.info("-----sendDataToTopic------"+object); 

        try {  

        System.out.println("topic:"+queuename);

        System.out.println("object:"+object);

        amqpTemplate.convertAndSend(queuename, object); 

        } catch (Exception e) {

        log.error(e.toString());

        }

    }

     

    public void sendYaowenToQueue(Object object) {

    log.info("-----sendYaowenToQueue------"+object); 

        try { 

        sendDataToQueue(queuename, object); 

        } catch (Exception e) {

        log.error(e.toString());

        }

    }

    

    public void sendYaowenToExchange(Object object) {

    log.info("-----sendYaowenToExchange------"+object); 

        try { 

        System.out.println("exchangename:"+exchangename);

        System.out.println("queuename:"+queuename);

        System.out.println("object:"+object);

        amqpTemplate.convertAndSend(exchangename,queuename, object); 

        } catch (Exception e) {

        log.error(e.toString());

        } 

    }

    

    /**

     * 点对点

     */

    public void sendDataToCrQueue(Object obj) {

        amqpTemplate.convertAndSend("queue_one_key", obj);

    }

    /**

     * 发送 发布--订阅消息

     */

    public void sendFanoutMsg(Object obj) {

    amqpTemplate.convertAndSend(obj);

    }

    /**

     * 主题

     */

    public void sendTopicMsg(String topic,Object obj) {

    amqpTemplate.convertAndSend(topic,obj);

    }

}

PS:

如果有多个消费者,那么queue会分发。就是其中一个消费者消费完这条message后,会从queue中移除,导致其他的消费者消费不到当条信息。

如果要实现所有的消费者能够消费相同的queue中的message的话,就要指定exchange为:amq.topic(rabbitmq自带的)   

打开rabbitmq的web管理页面在exchange中会看到这个exchange。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  rabbitmq