您的位置:首页 > 编程语言 > Java开发

(转)RabbitMQ学习之spring整合发送异步消息(注解实现)

2017-07-06 09:57 896 查看
http://blog.csdn.net/zhu_tianwei/article/details/40919249

实现使用Exchange类型为DirectExchange. routingkey的名称默认为Queue的名称。注解实现异步发送消息。

1.生产者配置ProducerConfiguration.Java

[java] view plain copy

print?

package cn.slimsmart.rabbitmq.demo.spring.async;

import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.config.BeanPostProcessor;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.annotation.Scheduled;

import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;

import com.rabbitmq.client.AMQP;

@Configuration

public class ProducerConfiguration {

// 指定队列名称 routingkey的名称默认为Queue的名称,使用Exchange类型为DirectExchange

protected final String helloWorldQueueName = "spring-queue-async";

// 创建链接

@Bean

public ConnectionFactory connectionFactory() {

CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.36.102");

connectionFactory.setUsername("admin");

connectionFactory.setPassword("admin");

connectionFactory.setPort(AMQP.PROTOCOL.PORT);

return connectionFactory;

}

// 创建rabbitTemplate 消息模板类

@Bean

public RabbitTemplate rabbitTemplate() {

RabbitTemplate template = new RabbitTemplate(connectionFactory());

template.setRoutingKey(this.helloWorldQueueName);

return template;

}

//创建一个调度

@Bean

public ScheduledProducer scheduledProducer() {

return new ScheduledProducer();

}

@Bean

public BeanPostProcessor postProcessor() {

return new ScheduledAnnotationBeanPostProcessor();

}

static class ScheduledProducer {

@Autowired

private volatile RabbitTemplate rabbitTemplate;

//自增整数

private final AtomicInteger counter = new AtomicInteger();

/**

* 每3秒发送一条消息

*

* Spring3中加强了注解的使用,其中计划任务也得到了增强,现在创建一个计划任务只需要两步就完成了:

创建一个Java类,添加一个无参无返回值的方法,在方法上用@Scheduled注解修饰一下;

在Spring配置文件中添加三个<task:**** />节点;

参考:http://zywang.iteye.com/blog/949123

*/

@Scheduled(fixedRate = 3000)

public void sendMessage() {

rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());

}

}

}

2.生产者启动类Producer,java

[java] view plain copy

print?

package cn.slimsmart.rabbitmq.demo.spring.async;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class Producer {

public static void main(String[] args) {

new AnnotationConfigApplicationContext(ProducerConfiguration.class);

}

}

3.接收消息处理类ReceiveMsgHandler.java

[java] view plain copy

print?

package cn.slimsmart.rabbitmq.demo.spring.async;

public class ReceiveMsgHandler {

public void handleMessage(String text) {

System.out.println("Received: " + text);

}

}

4.消费者配置ConsumerConfiguration

[java] view plain copy

print?

package cn.slimsmart.rabbitmq.demo.spring.async;

import org.springframework.amqp.core.AmqpAdmin;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.core.RabbitAdmin;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import com.rabbitmq.client.AMQP;

@Configuration

public class ConsumerConfiguration {

// 指定队列名称 routingkey的名称默认为Queue的名称,使用Exchange类型为DirectExchange

protected String springQueueDemo = "spring-queue-async";

// 创建链接

@Bean

public ConnectionFactory connectionFactory() {

CachingConnectionFactory connectionFactory = new CachingConnectionFactory(

"192.168.36.102");

connectionFactory.setUsername("admin");

connectionFactory.setPassword("admin");

connectionFactory.setPort(AMQP.PROTOCOL.PORT);

return connectionFactory;

}

// 创建rabbitAdmin 代理类

@Bean

public AmqpAdmin amqpAdmin() {

return new RabbitAdmin(connectionFactory());

}

// 创建rabbitTemplate 消息模板类

@Bean

public RabbitTemplate rabbitTemplate() {

RabbitTemplate template = new RabbitTemplate(connectionFactory());

// The routing key is set to the name of the queue by the broker for the

// default exchange.

template.setRoutingKey(this.springQueueDemo);

// Where we will synchronously receive messages from

template.setQueue(this.springQueueDemo);

return template;

}

//

// Every queue is bound to the default direct exchange

public Queue helloWorldQueue() {

return new Queue(this.springQueueDemo);

}

@Bean

public SimpleMessageListenerContainer listenerContainer() {

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

container.setConnectionFactory(connectionFactory());

container.setQueueNames(this.springQueueDemo);

container.setMessageListener(new MessageListenerAdapter(

new ReceiveMsgHandler()));

return container;

}

}

5.消费者启动类Consumer.java

[java] view plain copy

print?

package cn.slimsmart.rabbitmq.demo.spring.async;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class Consumer {

public static void main(String[] args) {

new AnnotationConfigApplicationContext(ConsumerConfiguration.class);

}

}

启动接收消息,再发送消息

[sql] view plain copy

print?

Received: Hello World 1

Received: Hello World 2

Received: Hello World 3

Received: Hello World 4

Received: Hello World 5

Received: Hello World 6

Received: Hello World 7

......

若报spring-queue-async消息队列不存在,请在控制台添加。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: