您的位置:首页 > 其它

RabbitMQ 入门

2014-09-28 22:16 253 查看
RabbitMQ 入门

随着业务的增多, 系统中的服务越来越多, 这就面临系统中服务调用紊乱的问题. 尤其在最近需要做一个任务调度的系统, 在三番思考之后, 决定采用RabbitMQ的消息队列实现任务调度问题.以下简称RMQ

1.下载安装RMQ

下载地址:http://www.rabbitmq.com/download.html

笔者使用的是ubuntu系统, 下载并安装完后, 进入官方的使用文档http://www.rabbitmq.com/install-debian.html

下载安装后, RMQ默认是已经启动的, 当然我们也可以手动进行启动关闭操作,具体命令:

rabbitmqctl 默认产生详细输出。通过”-q”标示可选择安静模式。

rabbitmqctl -q status

应用和集群管理

1.停止RabbitMQ应用,关闭节点

# rabbitmqctl stop

2.停止RabbitMQ应用

# rabbitmqctl stop_app

3.启动RabbitMQ应用

# rabbitmqctl start_app

4.显示RabbitMQ中间件各种信息

# rabbitmqctl status

5.重置RabbitMQ节点

# rabbitmqctl reset

# rabbitmqctl force_reset

2.Helloword

下载依赖的jar包:http://www.rabbitmq.com/java-client.html

RabbitMQ要实现的是一个典型的生产者消费者案例. 生产者向消息队列不断发送消息, 众多消费者从消息队列中取消息. RabbitMQ的存在, 简单而言就是解决消息队列中的存取问题.

开始码代码.以下是生产者代码:

123456789101112131415161718192021222324252627282930313233343536373839404142package cn.iyowei.mq.greeting; import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory; /** *生产者,每800ms发送一条消息给消息队列 */public class MQProducer { private static final String QUEUE_NAME = "greeting"; public static void main(String[] argv) throws java.io.IOException, InterruptedException { /** * 创建连接连接到MabbitMQ */ String producerName = "producer-[" + new Random().nextInt(9) + "]"; ConnectionFactory factory = new ConnectionFactory(); // 设置MabbitMQ所在主机ip或者主机名 factory.setHost("localhost"); // 创建一个连接 Connection connection = factory.newConnection(); // 创建一个频道 Channel channel = connection.createChannel(); // 指定一个队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 发送的消息 String message; // 往队列中发300条消息 for (int i = 0; i < 300; i++) { message = "hello world! " + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); Thread.sleep(800); // 等待800ms System.out.println(producerName + " Sent '" + message + "'"); } // 关闭频道和连接 channel.close(); connection.close(); }}
生产者主要负责每800ms发送一条消息给消息队列. 以下是消费者代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

packagecn.iyowei.mq.greeting;

importcom.rabbitmq.client.Channel;

importcom.rabbitmq.client.Connection;

importcom.rabbitmq.client.ConnectionFactory;

importcom.rabbitmq.client.QueueingConsumer;

/**

*
消费者,可以启动多个进行消费

*/

publicclassMQConsumer{

//
队列名称

privatestaticfinalStringQUEUE_NAME="greeting";

publicstaticvoidmain(String[]argv)throwsjava.io.IOException,

java.lang.InterruptedException{

StringconsumerName="
[consumer-"+newRandom().nextInt(9)+"]";

//
打开连接和创建频道,与发送端一样

ConnectionFactoryfactory=newConnectionFactory();

factory.setHost("localhost");

Connectionconnection=factory.newConnection();

Channelchannel=connection.createChannel();

//
声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。

channel.queueDeclare(QUEUE_NAME,false,false,false,null);

System.out.println(consumerName+"
Waiting for messages. To exit press CTRL+C");

//
创建队列消费者

QueueingConsumerconsumer=newQueueingConsumer(channel);

//
指定消费队列

channel.basicConsume(QUEUE_NAME,true,consumer);

while(true)

{

//
nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)

QueueingConsumer.Deliverydelivery=consumer.nextDelivery();

Stringmessage=newString(delivery.getBody());

System.out.println(consumerName+"
Received '"+message+"'");

}

}

}

消费者主要负责从队列从拉取消息.

注意到代码中给每个生产者和消费者这使用random命名, 旨在同时启动多个生产者和消费者, 以观察执行效果.

笔者开了一个生产者,三个消费者,生产者的log大致如:

producer-[1] Sent ‘hello world! 0′

producer-[1] Sent ‘hello world! 1′

producer-[1] Sent ‘hello world! 2′

producer-[1] Sent ‘hello world! 3′

producer-[1] Sent ‘hello world! 4′

producer-[1] Sent ‘hello world! 5′

producer-[1] Sent ‘hello world! 6′

producer-[1] Sent ‘hello world! 7′

producer-[1] Sent ‘hello world! 8′

而消费者1的log:

[consumer-1] Waiting for messages. To exit press CTRL+C

[consumer-1] Received ‘hello world! 0′

[consumer-1] Received ‘hello world! 3′

[consumer-1] Received ‘hello world! 6′

消费者2的log:

[consumer-2] Waiting for messages. To exit press CTRL+C

[consumer-2] Received ‘hello world! 1′

[consumer-2] Received ‘hello world! 4′

[consumer-2] Received ‘hello world! 7′

消费者3的log:

[consumer-3] Waiting for messages. To exit press CTRL+C

[consumer-3] Received ‘hello world! 2′

[consumer-3] Received ‘hello world! 5′

[consumer-3] Received ‘hello world! 8′

由上面的结果可以简单的推断, 消息队列向消费者发送消息的时候是从消费者1-消费者n轮流发送的.

下一篇:RabbitMQ (二) 命令初尝

javaRabbitMQ后端服务器消息队列
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息