初学RabbitMQ之HelloWorld
2015-11-05 11:56
239 查看
最近工作任务有点轻,且刚到新公司,接触到了RabbitMQ,觉得该消息代理很神奇,想先入个门,结果还出了一个糗事:因为本人比较菜,又想学RabbitMQ,结果找公司架构问了一个很弱智的问题—让他帮我介绍一个入门级的学习RabbitMQ的blog,结果对方只发来一句:详情请见官网http://www.rabbitmq.com/,后来补充了一句,学新技术,找官网……
好吧,闲话不多说,开始我们的HelloWorld吧!
材料准备:
1.本地安装好Erlang环境
2.本地安装好RabbitMQ服务器
3.本地Maven web 空项目一个
开始:
1.找到pom.xml文件,加入下面依赖
思考: 为什么要加入这段依赖?
回答(不一定准确): RabbitMQ是基于Erlang开发的,对于一些重要的开发语言(Python | Java | Ruby | PHP | C# | Javascript | Go)都有相应的驱动和客户端, 我们使用的是java开发,故先要获取java RabbitMQ客户端,上面那段便是java RabbitMQ的maven依赖配置
2.
由于生产者和消费者都是咱们自己写的应用程序,且和队列有关,而且需要和队列进行通信,故在代码结构上可以抽象出一个抽象类:AbstractApplication用来抽象生产者和消费者一些共同的操作,代码如下:
抽象类:AbstractApplication.java
注: 上面那段代码是不是和SessionFactory有点像呢… -.-
3.开始写我们的生产者程序了,要给队列发消息嘛,不过在生产者生产消息之前,得先加点依赖包进去(Apache Common Lang,方便把可序列化的java对象转换为byte[],加不加没啥关系,可忽略)
生产者: MsgProducer.java
注: 这样我们的生产者就产生了,可以给appName队列发消息啦,可是一个人是不是有点孤单,赶紧加入一个消费者吧 o(∩_∩)o
4.加入消费者之前,先想下是不是可以把消费者作为一个线程呢,一有消息我就给消费掉,不给浪费的机会,有想法就赶紧行动
消费者: MsgConsumer.java
其中Consumer是com.rabbitmq.client包下的一个接口,这是官方解释:
Interface for application callback objects to receive notifications and messages from a queue by subscription.(该接口是给咱们这些application使用的,当队列有消息来时,可以回调该对象的方法),其中最重要的回调方法是:handleDelivery,当队列有消息来时,回调该方法处理消息
5.好了,生产者和消费者都有了,咱们来测测这个Hello World到底能不能Hello了
测试类: Main.java
注: 先搞一个消费者在队列上监听着,一旦生产出来消息了,立即回调MsgConsumer.java中的回调方法处理掉,下面是我电脑上的部分输出哦,因为有多线程,故看到的结果可能不太一样,只要消费掉消息就是ok的
题外话: 这是第一次写这么长的blog,而且刚学RabbitMQ,有不对的恳请各位大侠指出,小弟先谢过啦!
好吧,闲话不多说,开始我们的HelloWorld吧!
材料准备:
1.本地安装好Erlang环境
2.本地安装好RabbitMQ服务器
3.本地Maven web 空项目一个
开始:
1.找到pom.xml文件,加入下面依赖
[code]<!-- RabbitMQ --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.0.4</version> </dependency>
思考: 为什么要加入这段依赖?
回答(不一定准确): RabbitMQ是基于Erlang开发的,对于一些重要的开发语言(Python | Java | Ruby | PHP | C# | Javascript | Go)都有相应的驱动和客户端, 我们使用的是java开发,故先要获取java RabbitMQ客户端,上面那段便是java RabbitMQ的maven依赖配置
2.
由于生产者和消费者都是咱们自己写的应用程序,且和队列有关,而且需要和队列进行通信,故在代码结构上可以抽象出一个抽象类:AbstractApplication用来抽象生产者和消费者一些共同的操作,代码如下:
抽象类:AbstractApplication.java
[code]public abstract class AbstractApplication { protected Channel channel; protected Connection connection; protected String appName;//队列名称 protected String name;//生产者或者消费者名称 public AbstractApplication (String appName ,String name) throws IOException { this.appName = appName; this.name = name; // 创建connection工厂 ConnectionFactory factory = new ConnectionFactory(); // MQ服务器主机名 factory.setHost("localhost"); // System.out.println(factory.getPort()); // 创建connection this.connection = factory.newConnection(); // 创建channel this.channel = connection.createChannel(); // 声明队列 channel.queueDeclare(appName, false, false, false, null); } /** * 关闭channel和connection。并非必须,因为隐含是自动调用的。 * @throws IOException */ public void close() throws IOException{ this.channel.close(); this.connection.close(); } }
注: 上面那段代码是不是和SessionFactory有点像呢… -.-
3.开始写我们的生产者程序了,要给队列发消息嘛,不过在生产者生产消息之前,得先加点依赖包进去(Apache Common Lang,方便把可序列化的java对象转换为byte[],加不加没啥关系,可忽略)
[code]<dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency>
生产者: MsgProducer.java
[code]public class MsgProducer extends AbstractApplication { public MsgProducer(String appName, String name) throws IOException { super(appName, name); System.out.println("*******消息生产者被创建******"); } public void sendMessage (Serializable object) throws IOException { this.channel.basicPublish("", appName,null,SerializationUtils.serialize(object)); } }
注: 这样我们的生产者就产生了,可以给appName队列发消息啦,可是一个人是不是有点孤单,赶紧加入一个消费者吧 o(∩_∩)o
4.加入消费者之前,先想下是不是可以把消费者作为一个线程呢,一有消息我就给消费掉,不给浪费的机会,有想法就赶紧行动
消费者: MsgConsumer.java
[code]public class MsgConsumer extends AbstractApplication implements Runnable, Consumer { public MsgConsumer(String appName, String name) throws IOException { super(appName, name); this.channel.basicQos(1); System.out.println("*******消息消费者被创建******"); } public void handleConsumeOk(String consumerTag) { System.out.println("=====消费者 "+ consumerTag + "已经注册====="); } public void handleCancelOk(String consumerTag) { } public void handleCancel(String consumerTag) throws IOException { } /** * 当有消息可消费时,回调此方法 */ public void handleDelivery(String arg0, Envelope arg1, BasicProperties arg2, byte[] arg3) throws IOException { Map<String, Integer> map = (HashMap<String, Integer>)SerializationUtils.deserialize(arg3); System.out.println("消息个数: " + map.get("MsgNum") + " 已经被接收"); } public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { } public void handleRecoverOk(String consumerTag) { } public void run() { try { // 开始消费消息, 并自动告知已经消费 channel.basicConsume(appName, true, this); } catch (IOException e) { e.printStackTrace(); } } }
其中Consumer是com.rabbitmq.client包下的一个接口,这是官方解释:
Interface for application callback objects to receive notifications and messages from a queue by subscription.(该接口是给咱们这些application使用的,当队列有消息来时,可以回调该对象的方法),其中最重要的回调方法是:handleDelivery,当队列有消息来时,回调该方法处理消息
5.好了,生产者和消费者都有了,咱们来测测这个Hello World到底能不能Hello了
测试类: Main.java
[code]public class Main { public Main () throws IOException { // 启动一个消费者线程 MsgConsumer consumer = new MsgConsumer("F_QUEUE", "consumer"); Thread consumerThread = new Thread(consumer); consumerThread.start(); // 创建一个生产者 MsgProducer producer = new MsgProducer("F_QUEUE", "producer"); for (int index = 0; index < 1000; index++) { HashMap<String, Integer> msgMap = new HashMap<String, Integer>(); msgMap.put("MsgNum", index); producer.sendMessage(msgMap); System.out.println("生产者生产了 :" + index + " 个消息"); } } public static void main(String[] args) throws IOException { new Main(); } }
注: 先搞一个消费者在队列上监听着,一旦生产出来消息了,立即回调MsgConsumer.java中的回调方法处理掉,下面是我电脑上的部分输出哦,因为有多线程,故看到的结果可能不太一样,只要消费掉消息就是ok的
题外话: 这是第一次写这么长的blog,而且刚学RabbitMQ,有不对的恳请各位大侠指出,小弟先谢过啦!
相关文章推荐
- desgn partten : 适配器&门面
- Python字符串格式转换
- 5.2 模型表示
- 5.1 3D幻境
- 面向对象三大特性之多态性
- iOS7时代我们用什么来追踪和识别用户?(iOS唯一标示符引导)
- Spark 共享变量——累加器(accumulator)与广播变量(broadcast variable)
- js 数组去重
- SQLSERVER数据库管理员的专用连接DAC
- Linux 的cp命令
- 磁粉检测技术
- 理解Contact的关键图
- iOS 沙盒路径获取,创建文件
- #Pragma Pack(n)与内存分配
- SpringMVC介绍之约定优于配置
- jsonp详解
- 查找页面元素
- 装xcode7创建window界面出现错误解决方法
- 【click】又是TMD click事件
- cocos2d-JS中场景(scene)之前如何传参。