您的位置:首页 > 编程语言 > Java开发

RabbitMQ收发——spring 集成

2018-01-29 16:59 387 查看
上一篇讲了通过原生的AMQP协议去读写Rabbit的方式,这次我们整合到spring中测试一下

首先依旧是连接对象:

protected CachingConnectionFactory getConnectionFactory() {
//创建连接工厂
CachingConnectionFactory factory = new CachingConnectionFactory();
//设置RabbitMQ相关信息
factory.setHost(host);
factory.setUsername(account);
factory.setPassword(password);
factory.setPort(port);
factory.setPublisherConfirms(true);
return factory;
}


//申明两个全局参数
private RabbitTemplate rabbitTemplate;
private RabbitAdmin rabbitAdmin;
public RabbitTemplate getRabbitTemplate() {
if (rabbitTemplate == null) {
rabbitTemplate = new RabbitTemplate(getConnectionFactory());
}
return rabbitTemplate;
}
public RabbitAdmin getRabbitAdmin() {
if (rabbitAdmin == null) {
rabbitAdmin = new RabbitAdmin(getConnectionFactory());
}
return rabbitAdmin;
}


初始化这两个成员

public void connect() throws IOException {
getRabbitTemplate();
getRabbitAdmin();
}


发送消息:

//type是我业务中的字段,这里我用来模拟添加参数
public boolean sendData(String topicStr, byte[] data, int type) {
//申明队列,如果队列不存在会进行创建
getRabbitAdmin().declareQueue(new Queue(topicStr));
//设置发送参数,这里我们模拟了一个头信息type,如果不需要添加参数,则可以将new MessagePostProcessor内容去除
//getRabbitTemplate().convertAndSend(topicStr, data);
getRabbitTemplate().convertAndSend(topicStr, data, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader(MessageType.HEAD_FRONT_MESSAGE_TYPE, type);
return message;
}
});
return true;
}


监听队列

public void getMessage(String topicStr) {
System.out.println("开始监听");
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(getConnectionFactory());
container.setQueueNames(topicStr);
//监听回调
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("receive msg : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
messageService.receiveFormMiddleware(body);
}
});
container.start();
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spring AMQP