您的位置:首页 > 编程语言 > Java开发

消息队列RabbitMQ与Spring集成

2017-03-17 17:41 197 查看

1.RabbitMQ简介

RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。

官网:http://www.rabbitmq.com/

2.maven配置

<!--rabbit -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.6.RELEASE</version>
</dependency>


3.配置文件

rabbitmq.properties

mq.host=172.17.22.187
mq.username=remote_user
mq.password=123456


4.Spring配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd">

<context:property-placeholder location="classpath:rabbitmq.properties" />

<!--配置connection-factory,指定连接rabbit server参数-->
<rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" />

<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成-->
<rabbit:admin connection-factory="connectionFactory"/>

<!--定义queue-->
<rabbit:queue id="com.mj.test" name="com.mj.test" durable="true" auto-delete="false" exclusive="false"/>

<!-- 定义direct exchange,绑定com.mj.test queue -->
<rabbit:direct-exchange name="myChange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="com.mj.test" key="hello"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>

<!--定义rabbit template用于数据的接收和发送-->
<rabbit:template id="myAmqpTemplate" connection-factory="connectionFactory" exchange="myChange" />

<!-- 接受 -->
<bean id="messageReceiver" class="com.ucs.mq.QueueListenter"></bean>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="com.mj.test" ref="messageReceiver"/>
</rabbit:listener-container>

</beans>


5.发送消息Producer

package com.ucs.mq;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import net.sf.json.JSONSerializer;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.transaction.TransactionConfiguration;
import org.springframework.transaction.annotation.Transactional;

import com.ucs.base.BaseTest;

@Transactional
@RunWith(SpringJUnit4ClassRunner.class)
@TransactionConfiguration(transactionManager="transactionManager",defaultRollback=false)
@ContextConfiguration(locations={"classpath:application-mq.xml"})
public class TestMQ {

@Autowired
private AmqpTemplate amqpTemplate;

@Test
public void send() throws Exception{

List<String> submobileList=new ArrayList<String>();
submobileList.add("1");
submobileList.add("2");
submobileList.add("3");
Map<String, Object> bodyMap = new HashMap<String, Object>();
bodyMap.put("batchNo", "递四方速递");
bodyMap.put("item", submobileList);
String jsonStr=JSONSerializer.toJSON(bodyMap).toString();
amqpTemplate.convertAndSend("hello", jsonStr);
}
}


6.异步接收消息Consumer

package com.ucs.mq;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class QueueListenter implements MessageListener {

@Override
public void onMessage(Message msg) {
try{
System.out.println(new String(msg.getBody(),"UTF-8"));
String str=new String(msg.getBody(),"UTF-8");
JSONObject json=JSONObject.fromObject(str);

System.out.println(json.get("batchNo").toString());
JSONArray jSONArray=JSONArray.fromObject(json.get("item"));
System.out.println(jSONArray.toString());
}catch(Exception e){
e.printStackTrace();
}
}
}


7.运行

{"batchNo":"递四方速递","item":["1","2","3"]}
递四方速递
["1","2","3"]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: