您的位置:首页 > 其它

RabbitMQ安装与使用

2016-06-12 17:15 381 查看
一、说明

   AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

二、软件安装

1、Python安装

  访问官网https://www.python.org/downloads/下载适合的版本。依次执行解压(并进入软件解压目录),./configure,make,make install。在命令行输入python命令,如果正常出现版本信息就表示成功。

   问题1:输入python命令可能展示的版本信息并不是你安装的版本

   解决:1:出现这种问题通常是你们操作信息已经默认安装python版本(版本通常是2.*)。要使用新安装的版本,使用ln -s 命令重新建立指向安装python的软连接就可以了。

   问题2:Python升级后yum命令执行异常:File "/usr/bin/yum", line 30

   解决2:这个问题是由于Python高版本对低版本兼容性不太好导致的。具体的解决方案,请查看参考文档2。

2、simplejson安装

   访问https://pypi.python.org/pypi/simplejson#downloads下载适合的版本。依次执行解压(并进入软件解压目录),python setup.py build,python setup.py install。

3、Erlang安装

   访问官网http://www.erlang.org/downloads下载合适的版本。依次执行解压(并进入软件解压目录),./configure,make,make install。在命令行输入erl命令,如果正常出现版本信息就表示成功。

   问题1:执行./configure命令时出现错误:error: No curses library functions found

   解决1:出现上面的问题是因为没有安装ncurses。具体解决方案请查看参考文档4。

4、RabbitMQ-Server安装

   访问官网http://www.rabbitmq.com/download.html下载合适的版本(建议下载Binary版本,如果下载Fedor/RHEL等安装版本,对操作系统和依赖软件等有要求,过程比较麻烦)。启动RabbitMQ-Server命令为rabbitmq-server –detached,查看状态命令为rabbitmqctl status,停止命令为rabbitmqctl
stop。

5、RabbitMQ管理插件安装

   在RabbitMQ-Server启动状态下,执行命令rabbitmq-plugins enable rabbitmq_management打开管理插件,并重启RabbitMQ-Server(该过程可以查看参考文档5)。因为通常访问监控页面都不是在本机上(本机上有默认的guest/guest账号密码可以访问),所以我们需要添加一个访问账号,增加一个管理员权限账号和密码为admin/123456的过程为#rabbitmqctl add_user admin 123456 ,#rabbitmqctl set_user_tags
admin administrator(更多信息可以参看参考文档6)。重新启动RabbitMQ,输入http://server-name:15672 就能够进入到监控页面。

补充1:连接Rabbitmq服务器时可能报“org.springframework.amqp.AmqpTimeoutException: java.util.concurrent.TimeoutException”错误。该错误是由于安装Rabbitmq的机器使用别名,软件默认用改别名作为本地域名导致通信超时引起的。将机器别名解析成本机IP就可以解决该问题。

三、RabbitMQ示例

1、消息生产者

<span style="font-family:FangSong_GB2312;font-size:24px;">package com.test;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
* RibbitMQ发送端
* @author Horace
*
*/
public class MessageProducer {

private static final String TASK_QUEUE_NAME = "task_queue";

@SuppressWarnings("resource")
public static void main(String[] argv) throws IOException, TimeoutException {

String host = "172.18.8.46";
String username = "admin";
String password = "123456";

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setUsername(username);
factory.setPassword(password);

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 指定队列持久化
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

String msg = "";
boolean run = true;
do {
System.out.println("请输入发送消息(quit时退出):");
Scanner sc = new Scanner(System.in);
msg = sc.next();
// 指定消息持久化
if ("quit".equals(msg)) {
run = false;
} else {
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
System.out.println("[RabbitMQ] Send '" + msg + "'");
}
} while (run);

channel.close();
connection.close();
}

}
</span>


2、消息消费者

<span style="font-family:FangSong_GB2312;font-size:24px;">package com.test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
* RibbitMQ接收端
* @author Horace
*
*/
public class MessageConsumer {

private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] argv) throws IOException, InterruptedException, TimeoutException {

String host = "172.18.8.46";
String username = "admin";
String password = "123456";

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setUsername(username);
factory.setPassword(password);

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 指定队列持久化
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

// 指定该消费者同时只接收一条消息
channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);

//取消 autoAck
boolean autoAck = false ;
// 打开消息应答机制
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery()
4000
;
String message = new String(delivery.getBody());

System.out.println(" [RibbitMQ] Received '" + message + "'");
doWork(message);
System.out.println(" [RibbitMQ] Done");

//返回接收到消息的确认信息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}

private static void doWork(String task) throws InterruptedException {
for (char ch : task.toCharArray()) {
if (ch == '.'){
Thread.sleep(1000);
}
}
}
}
</span>


参考文档:

1、http://blog.csdn.net/column/details/rabbitmq.html RabbitMQ从入门到精通

2、http://blog.csdn.net/educast/article/details/35992127 Python升级带来的yum异常

3、http://blog.csdn.net/historyasamirror/article/details/6827870 RabbitMQ的安装,配置,监控

4、http://www.tuicool.com/articles/iIruqyA Centos6下安装erlang手记

5、http://blog.csdn.net/yasi_xi/article/details/8952078 RabbitMQ管理插件的安装

6、http://blog.csdn.net/zyz511919766/article/details/42292655 RabbitMQ用户角色及权限控制

7、http://www.rabbitmq.com/getstarted.html RabbitMQ官方示例
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  rabbitmq erlang