您的位置:首页 > 编程语言 > Java开发

java 操作 RabbitMQ 发送、接受消息

2017-04-26 15:18 417 查看
例子1

Producer.java

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
public final static String QUEUE_NAME="rabbitMQ_test2";

public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();

//设置RabbitMQ相关信息
factory.setHost("100.51.15.10");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);

//创建一个新的连接
Connection connection = factory.newConnection();

//创建一个通道
Channel channel = connection.createChannel();

// 声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//发送消息到队列中
String message = "Hello RabbitMQ";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("Producer Send +'" + message + "'");

//关闭通道和连接
channel.close();
connection.close();
}
}

Consumer.java

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP;

public class Customer {
private final static String QUEUE_NAME = "rabbitMQ_test2";

public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();

//设置RabbitMQ地址
factory.setHost("100.51.15.10");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);

//创建一个新的连接
Connection connection = factory.newConnection();

//创建一个通道
Channel channel = connection.createChannel();

//声明要关注的队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("Customer Waiting Received messages");

//DefaultConsumer类实现了Consumer接口,通过传入一个频道,
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Customer Received '" + message + "'");
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

执行

Producer.java

Producer Send +'Hello RabbitMQ'
Producer Send +'Hello RabbitMQ'
Consumer.java

1
2
Customer Received 'Hello RabbitMQ'
Customer Received 'Hello RabbitMQ'
回到顶部
例子2

首先写一个类,将产生产者和消费者统一为 EndPoint类型的队列。不管是生产者还是消费者,连接队列的代码都是一样的,这样可以通用一些。

EndPoint.java

//package co.syntx.examples.rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* Represents a connection with a queue
* @author syntx
*
*/
public abstract class EndPoint{

protected Channel channel;
protected Connection connection;
protected String endPointName;

public EndPoint(String endpointName) throws IOException{
this.endPointName = endpointName;

//Create a connection factory
ConnectionFactory factory = new ConnectionFactory();

//hostname of your rabbitmq server
factory.setHost("100.51.15.10");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);

//getting a connection
try{
connection = factory.newConnection();
}catch (TimeoutException ex) {
System.out.println(ex);
connection = null;
}

//creating a channel
channel = connection.createChannel();

//declaring a queue for this channel. If queue does not exist,
//it will be created on the server.
channel.queueDeclare(endpointName, false, false, false, null);
}

/**
* 关闭channel和connection。并非必须,因为隐含是自动调用的。
* @throws IOException
*/
public void close() throws IOException{
try{
this.channel.close();
} catch (TimeoutException ex){
System.out.println("ex" + ex);
}
this.connection.close();
}
}

Producer2.java

import java.io.IOException;
import java.io.Serializable;

import org.apache.commons.lang.SerializationUtils;

public class Producer2 extends EndPoint{

public Producer2(String endPointName) throws IOException{
super(endPointName);
}

public void sendMessage(Serializable object) throws IOException {
channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
}
}

QueueConsumer.java

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.SerializationUtils;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

public class QueueConsumer extends EndPoint implements Runnable, Consumer{

public QueueConsumer(String endPointName) throws IOException{
super(endPointName);
}

public void run() {
try {
//start consuming messages. Auto acknowledge messages.
channel.basicConsume(endPointName, true,this);
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* Called when consumer is registered.
*/
public void handleConsumeOk(String consumerTag) {
System.out.println("Consumer "+consumerTag +" registered");
}

/**
* Called when new message is available.
*/
public void handleDelivery(String consumerTag, Envelope env,
BasicProperties props, byte[] body) throws IOException {
Map map = (HashMap)SerializationUtils.deserialize(body);
System.out.println("Message Number "+ map.get("message number") + " received.");

}

public void handleCancel(String consumerTag) {}
public void handleCancelOk(String consumerTag) {}
public void handleRecoverOk(String consumerTag) {}
public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}
}

Main.java

import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap;

public class Main {
public Main() throws Exception{

QueueConsumer consumer = new QueueConsumer("queue");
Thread consumerThread = new Thread(consumer);
consumerThread.start();

Producer2 producer = new Producer2("queue");

for (int i = 0; i < 5; i++) {
HashMap message = new HashMap();
message.put("message number", i);
producer.sendMessage(message);
System.out.println("Message Number "+ i +" sent.");
}
}

public static void main(String[] args) throws Exception{
new Main();
System.out.println("##############end...");
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java