您的位置:首页 > 编程语言 > Java开发

rabbitMQ与Spring集成

2017-08-23 16:30 399 查看
简介:此时使用rabbitMQ是为了解决项目中多数据源造成的连接经常拿不到问题,比如一条数据需要同时向两个不同的数据库插入,那么插入一个数据库之后会同步返回成功的提示,将数据放到消息队列中处理,等待消费者将数据异步插入另外一个数据库,减小连接池的压力,同时减少出错的概率,提高用户体验。

1.maven配置(pom.xml)

<dependency>  

       <groupId>org.springframework.amqp</groupId>  

       <artifactId>spring-rabbit</artifactId>  

       <version>1.3.5.RELEASE</version>  

</dependency>

2.rabbitMQ配置文件(rabbitMQ.properties)

#rabbitMQ服务器配置

mq.host=127.0.0.1

mq.username=admin

mq.password=123321

mq.port=5672

#队列

mq.queueName=queueName1

mq.queueKey=551B3D4A88C47EDAE0530100007FC2EA

3.Spring配置(rabbitMQ.xml)

 <!--配置connection-factory,指定连接rabbit server参数 -->

    <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}"/>

    

    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->  

    <rabbit:admin id="connectAdmin" connection-factory="connectionFactory"/>  

    

    <!-- 设置Ack模式为手动 -->    

    <bean id="ackManual"

        class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean">

        <property name="staticField"

            value="org.springframework.amqp.core.AcknowledgeMode.MANUAL" />

    </bean>

    

    <!--定义queue -->  

    <!-- durable:是否持久化
exclusive: 仅创建者可以使用的私有队列,断开后自动删除
auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->

    <rabbit:queue name="${mq.queueName}" durable="true" auto-delete="false" exclusive="false"/>  

    

    <!-- 定义direct exchange,绑定queueTest -->  

    <!-- 交换机的四种模式:
direct:转发消息到 routigKey 指定的队列。
topic:按规则转发消息(最灵活)。
headers:(这个还没有接触到)
fanout:转发消息到所有绑定队列 -->

    <rabbit:direct-exchange name="amqpExchange" durable="true" auto-delete="false">  

        <!-- 设置消息queue匹配的key -->

        <rabbit:bindings>  

            <rabbit:binding queue="${mq.queueName}" key="${mq.queueKey}"></rabbit:binding>

        </rabbit:bindings>  

    </rabbit:direct-exchange>  

  

  <!-- 消息对象json转换类 -->
<!-- <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
-->

<!--定义rabbit template用于数据的接收和发送 -->  

    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"  

                     exchange="amqpExchange"/> 

 

    <!-- 消息接收者 -->  

    <bean id="messageReceiver" class="com.standard.service.impl.MessageConsumerServiceImpl"></bean>  

  

    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->  

    <!-- acknowledge="manual" 设置消息队列手动确认,防止因为断网等其他原因造成的消息丢失 -->

    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">  

        <rabbit:listener queues="${mq.queueName}" ref="messageReceiver"/>  

    </rabbit:listener-container>  

 

4.发送消息(Producer)

/**

 * 发送消息

 * @author hcsu

 * 2017年8月8日 上午10:48:35

 */

public interface MQProducer {

     /**

     * 发送消息到指定队列

     * @param queueKey

     * @param object

     * @throws Exception 

     */

    void sendDataToQueue(String queueKey, Object object) throws Exception;

}

import org.springframework.amqp.core.AmqpTemplate;

@Service("mqProducer")

public class MQProducerImpl implements MQProducer {

private static final Logger logger = LoggerFactory.getLogger(MQProducerImpl.class);

@Autowired

    private AmqpTemplate amqpTemplate;

@Override
public void sendDataToQueue(String queueKey, Object object) throws Exception{
try {

            amqpTemplate.convertAndSend(queueKey, object);

        } catch (Exception e) {

        logger.info(e.getLocalizedMessage());

        throw new Exception("MQ发送消息失败");

        }
}

}



5.异步接收消息(Consumer)

public class MessageConsumer implements ChannelAwareMessageListener{

private static final Logger logger =LoggerFactory.getLogger(MessageConsumer .class);
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
String mqOrderInfoStr=new String(message.getBody(),"utf-8");
logger.info("消息消费者:"+mqOrderInfoStr);
//处理业务逻辑
//消息标示,false只确认当前一个消息确认,true确认所有consumer获得的消//息
channel.basicAck(message.getMessageProperties().getDeliveryTa g(), false);
} catch (Exception e) {
e.printStackTrace();
//ack返回false,并重新回到队列
channel.basicNack(message.getMessageProperties().getDeliveryT ag(), false,true);
}


}

6.JUnit测试

public class Test(){

public static void main(String[] args) {

//MQ发送消息

String queueKey = null;
Properties pro = new Properties();
try {
pro.load(getClass().getResourceAsStream("/rabbitMQ.properties"));
if(pro.containsKey("mq.queueKey"))
queueKey= pro.getProperty(key).trim();

mqProducer.sendDataToQueue(queueKey, "Hello,rabbitMQ!");
} catch (IOException e) {
e.printStackTrace();
}

}

}

7.运行结果

消息消费者:Hello,rabbitMQ!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: