您的位置:首页 > 其它

RabbitMQ(消息队列技术,它是分布式应用间交换信息的一种技术)

2016-12-22 16:28 746 查看
一.JMS和MQ介绍

1.JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务。

2.MQ全称为Message
Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求

3.JMS和MQ的关系:

  JMS是一个用于提供消息服务的技术规范,它制定了在整个消息服务提供过程中的所有数据结构和交互流程。而MQ则是消息队列服务,是面向消息中间件(MOM)的最终实现,是真正的服务提供者;MQ的实现可以基于JMS,也可以基于其他规范或标准。

4.MQ应用场景

消息队列的主要作用不是通讯,主要用于解除子系统间的耦合,所以异构系统间的通讯实际并不是activeMQ发挥作用的场景,那反而是RPC发挥作用的时候。

消息队列更适用于需要大流量和并发的大型系统场景,可以将一个消息队列视作一个可靠的通道,主交易过程在处理时,遇到需要时间较多同时又已经确定了条件的处理就丢到消息队列中进行后续的处理,这样可以将主交易过程划分为一个一个可以异步处理的更小的处理过程,减少了主交易流程的处理时间,可以提供更快的响应速度和并发速度。例如,像淘宝这样的处理逻辑非常多的系统,在处理付款时,就可以把通知买家和卖家、记日志甚至记账流程都放到消息队列中处理,整个主流程能够更快的处理完成,继续处理下一个买家的付款请求。

二.MQ技术

关于消息队列的一些理解,不明白我们为什么要在自己的系统加入消息队列?

对于一个业务比较小的系统,以前的做法是直接通过一个项目集成了所有业务,比如是一个商城系统,登陆,注册,购物车,支付功能等等,一个系统把所有的事情做完了,但是随着业务的不断扩张,单纯的单一系统,已经不能再适应这个商城系统,我们很多时候回对整个系统进行解耦,进行分布式的开发,把一些功能细化出去,形成各种各样的子系统,例如登陆的模块分离出去,变成了独立的授权验证服务器子系统,支付功能形成一个独立的子系统,那么问题来了,由于两个不同的服务器之间,两者是如何进行消息的同步的呢。,那么我们首先可能会想到诸如resetful api或者socket来实现,其实答案是可以的,不过假如是socket实现的,那么对于我们开发者来说,需要我们解决的问题有很多:

1、我们需要具备网络的底层知识,以及使用相应的客户端语言来实现收发信息,而且还需要定制一套相应的通信协议。2、我们如何做到消息传递的准确性以及数据已经完整的接收.3、解决网络中断消息传递的处理问题。4、在多个系统之间,哪个子系统最先会受到信息.5、对于异构系统间的数据交换,如何解决.6、如何处理应用程序阻塞.7、负载均衡的问题8、子系统通信之间的地址管理其实,我们要解决的问题远远不止上面这些,但是我们使用了消息队列的中间件之后,可以轻松的把问题解决掉。总的来说,使用消息队列,可以解决我们开发者在解决分布式架构之间的传递问题。对于我们开发来者来说,我们一般会采用一些开源的MQ协议来接入我们的系统,下面我简单介绍一些消息队列中间件。目前比较热门的三种消息队列
ActiveMQ
ActiveMQ 由Apache维护的比较优秀的开源协议。它是可以支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,是支持Apache2.0协议,对于spring项目,我们可以很轻易的嵌入进去。支持的客户端语言:
Java, C, C++, C#, Ruby, Perl, PythonPHP。支持的应用协议:OpenWire、Stomp、XMPP、AMQP、MQTT等等。对于XMPP,我在前一些文章介绍过,它是可以用来我们的即时通讯的协议,以XML格式传递。而MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议。该协议支持所有平台,目前可以用于物联网的通讯中,前景非常可观.官方网站:http://activemq.apache.org/

RabbitMQ

是用Erlang编写的一款基于AMQP协议的消息队列中间件,在开源项目关注度比较高。

支持的客户端语言:

Python、Ruby、.NET、Java、Node.js、C、PHP、Go、Erlang。

支持的协议XMPP、STOMP等

官方网站:
http://www.rabbitmq.com/devtools.html
RocketMQ

它是由阿里维护的一个开源消息队列,它在阿里各个业务线得到了广泛的应用,业务场景如下:

MySQL binlog 、同步订单类应用、流计算IM
等实时消息领域、Cache 同步、削峰填谷,目前阿里云的消息中间件也是由它演变而来的

支持的客户端语言:

Java 、C++

支持的协议:

JMS,MQTT

官方网站:
https://github.com/alibaba/rocketmq?spm=5176.doc29532.2.1.I7qrLz
当然消息的中间件,不止上面这些,比如有关注比较高的Jafka/Kafka、open mq,还有一些商业的消息中间件。这里就不一一介绍了。

大概介绍了MQ的一些知识,我们如何在我们的Spring mvc项目中加入消息队列?

上面的三种消息队列中间件,大家可以根据自己的业务需要,使用一个适合自己的中间件,我这里主要讲解一些如何在spring MVC加入RabbitMQ。

三.Win7系统下的安装与配置

     RabbitMQ是基于Erlang的,所以首先必须配置Erlang环境。下载Erlang和RabbitMQ,安装完成后,在RabbitMQ的安装目录的sbin会有:rabbitmq-server.bat,在cmd下(以管理员方式运行):进入sbin目录,运行rabbitmq-server
start,启动rabbit服务,浏览器访问localhost:15672  默认账号:guest  密码:guest,如下图。



四.spring跟RabbitMQ集成

1.添加maven依赖

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.6.0.RELEASE</version>
</dependency>

<!-- rabbit -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.4</version>
</dependency>

<!-- Jackson是Java中用来处理json的类库 -->
<dependency>
  <groupId>org.codehaus.jackson</groupId>
   <artifactId>jackson-mapper-asl</artifactId>
   <version>1.9.13</version>
</dependency>

下载好后,我们需要了解连接rabbit的一些步骤,

1. 客户端连接到消息队列服务器,打开一个channel。
2. 客户端声明一个exchange,并设置相关属性。
3. 客户端声明一个queue,并设置相关属性。
4. 客户端使用routing key,在exchange和queue之间建立好绑定关系。
5. 客户端投递消息到exchange。
6. exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里

2.添加spring-amqp.xml文件,我这里只是以topic为例子,添加RabbitMQ配置信息如下:

<beans
xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 消息接受者 -->
<bean id="receiveMessageListener" class="com.yinjf.cxf.controller.ReceiveMessageListener"></bean>

<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory"

        host="127.0.0.1" username="guest" password="guest" virtual-host="/" />

<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin connection-factory="connectionFactory" id="amqpAdmin"/>

<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" channel-transacted="true"
message-converter="jsonMessageConverter"/>

<!--json消息转换-->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper"></bean>
</property>
</bean>

<!--定义queue(消息队列),id和name可以是名字一致的-->
<rabbit:queue id="com.yinjf.cxf.queue" name="com.yinjf.cxf.queue">
<rabbit:queue-arguments><entry key="x-ha-policy" value="all"/></rabbit:queue-arguments>
</rabbit:queue>

<!--

 声明Exchange的类型为topic并设定Exchange的名称,绑定com.yinjf.cxf.queue队列

Exchange类似于数据通信网络中的交换机,提供消息路由策略。rabbitmq中,producer不是通过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和Queue一样,Exchange也可设置为持久化,临时或者自动删除。
Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别:
Direct
直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue
fanout
广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue。
topic
主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为
* .stock,user.stock, * . * 和#.user.stock.#的队列。( * 表是匹配一个任意词组,#表示匹配0个或多个词组)
headers
消息体的header匹配(ignore)
Binding
所谓绑定就是将一个特定的 Exchange 和一个特定的 Queue 绑定起来。Exchange 和Queue的绑定可以是多对多的关系。 

 -->
<rabbit:topic-exchange name="yinjf.cxf">
<rabbit:bindings>
<!-- 这里的queue是<rabbit:queue 里的ID -->
<rabbit:binding pattern="com.yinjf.cxf" queue="com.yinjf.cxf.queue"/>
</rabbit:bindings>
</rabbit:topic-exchange>

<!--

消息的监听,如果有多个子系统的话,可以在子系统中定义,我这里只是同时让他把消息发送和接收放在同一个系统而已。

观察监听模式
当有消息到达时会通知监听在对应的队列上的监听对象

-->
<rabbit:listener-container connection-factory="connectionFactory" message-converter="jsonMessageConverter"
channel-transacted="true" concurrency="10" auto-startup="true">
<!-- queues是我们上面的消息的队列,这句话的意思就是监听上面名字的队列 -->
<rabbit:listener queues="com.yinjf.cxf.queue" ref="receiveMessageListener" method="handleMessage"/>
</rabbit:listener-container>

</beans>

3.编写发送和接收发送器代码

1.编写发送器代码,代码如下:

@Controller

public class MessageQueeController {

private static final String EXCHANGE = "yinjf.cxf";

@Autowired
private AmqpTemplate amqpTemplate;

@RequestMapping(value = "/sendmsg", method = RequestMethod.GET)
@ResponseBody
public String showInfo(String msg) {
System.out.println("message=" + msg);
amqpTemplate.convertAndSend(EXCHANGE, "com.yinjf.cxf", msg);
return msg;
}

}

2.编写接收器代码,代码如下:

@Component

public class ReceiveMessageListener {

public void handleMessage(String result) {
if (result == null) {
System.out.println("oh,no");
return;
}

System.out.println("receive
msg=" + result);
}

}

4.运行

我们在浏览器输入我们的项目地址
http://localhost:8083/rabbit/sendmsg?msg=test,可以发现控制台会打印出如下信息,说明我们的消息监听已经成功了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: