您的位置:首页 > 其它

rabbitMQ的简单实例——amqp协议带数据回写机制

2016-01-13 23:16 489 查看
rabbitMQ是一种高性能的消息队列,支持或者说它实现了AMQP协议(advanced message queue protocol高级消息队列协议)。

下面简单讲一讲一个小例子。我们首先要部署好rabbitMQ,然后实现一个生产者—消费者,生产者向rabbit中发布一个消息,消费者去rabbit取这个消息,在正确收到这个消息后,消费者会通过返回队列回写通知生产者自己收到了消息。

windows下部署rabbit非常简单,先安装erlang运行时,然后安装rabbitMQ安装文件即可,都是exe的,很简单。然后找到rabbit的sbin目录里的bat即可启动rabbitMQ。

下面是producer—consumer代码:

package com.hzfi.rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;

public class Producer {
private final static String QUEUE_NAME = "myQueue"; //上送队列

public static void main(String[] args) throws IOException, TimeoutException{
String replyQueueName = null;   //返回队列名

ConnectionFactory connFactory = null;
Connection conn = null;
Channel channel = null;
try{
connFactory = new ConnectionFactory();
connFactory.setHost("localhost");
conn = connFactory.newConnection();
channel = conn.createChannel();
//返回队列
replyQueueName = channel.queueDeclare().getQueue();
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);

String corrId = java.util.UUID.randomUUID().toString(); //用来表示返回队列结果的id,唯一
BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
String msg = "linyang@hzfi.cn";
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, props, msg.getBytes());
System.out.println("producer has published: \"" + msg + "\"");

while(true){
Thread.sleep(1000);
Delivery delivery = consumer.nextDelivery();
System.out.println("from server reply:" + new String(delivery.getBody()));
}
}catch(IOException ioe){
ioe.printStackTrace();
}catch(TimeoutException toe){
toe.printStackTrace();
} catch (ShutdownSignalException e) {
e.printStackTrace();
} catch (ConsumerCancelledException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
if(channel!=null)   channel.close();
if(conn!=null)  conn.close();
}
}
}


package com.hzfi.rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Consumer {
private final static String QUEUE_NAME = "myQueue";
public static void main(String[] args) throws IOException, TimeoutException{
ConnectionFactory connFactory = null;
Connection conn = null;
Channel channel = null;
try{
connFactory = new ConnectionFactory();
connFactory.setHost("localhost");
conn = connFactory.newConnection();
channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("listening for event message...");

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while(true){
Thread.sleep(1000);
Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties reply_props = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
String msg = new String(delivery.getBody(),"utf-8");
System.out.println("receive msg:" + msg);
String retMsg = "ok, give you reply:" + new String(msg.getBytes(),"utf-8");
System.out.println("Consumer中的返回队列名" + props.getReplyTo());
channel.basicPublish( "", props.getReplyTo(), reply_props, retMsg.getBytes());
}
}catch(IOException ioe){
ioe.printStackTrace();
}catch(TimeoutException toe){
toe.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
if(channel!=null)   channel.close();
if(conn!=null)  conn.close();
}
}
}


开启RabbitMQ的后台管理服务(是个web页面)

\sbin>rabbitmq-plugins enable rabbitmq_management

访问地址 http://localhost:15672/ id/psw: guest/guest

可以对队列,用户,权限等进行管理,例如,默认情况下密码是任意,如上代码所示,ConnectionFactory仅仅设置了主机名,并未设置用户名和密码。

我们可以新建或修改一个用户名和密码,如下图:



这样,我们上面的代码也要做相应的调整:

ConnectionFactory connFactory = new ConnectionFactory();
connFactory.setHost("localhost");
connFactory.setUsername("guest");
connFactory.setPassword("123");
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: