您的位置:首页 > 其它

RabbitMQ学习笔记4-使用fanout交换器

2016-10-22 22:56 381 查看
fanout交换器会把发送给它的所有消息发送给绑定在它上面的队列,起到广播一样的效果。

本里使用实际业务中常见的例子,

订单系统:创建订单,然后发送一个事件消息

积分系统:发送订单的积分奖励

短信平台:发送订单的短信

消息生产者SenderWithFanoutExchange

1 package com.yzl.test3;
2
3 import java.util.Date;
4
5 import com.google.gson.Gson;
6 import com.rabbitmq.client.Channel;
7 import com.rabbitmq.client.Connection;
8 import com.rabbitmq.client.ConnectionFactory;
9
10 /**
11  *    使用fanout交换器产生事件,消费者订阅事件做相应的处理
12  * @author: yzl
13  * @date: 2016-10-22
14  */
15 public class SenderWithFanoutExchange {
16     //交换器名称
17     private static final String EXCHANGE_NAME = "myFanoutExchange";
18
19     public static void main(String[] args) throws Exception {
20         //连接到rabbitmq服务器
21         ConnectionFactory factory = new ConnectionFactory();
22         factory.setHost("localhost");
23         Connection connection = factory.newConnection();
24         //创建一个信道
25         final Channel channel = connection.createChannel();
26         //定义一个名字为topicExchange的fanout类型的exchange
27         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
28
29       //创建一个时间的Event对象
30         EventObj createOrderEvent = null;
31         for(int i=1; i<10; i++){
32             createOrderEvent = new EventObj();
33             createOrderEvent.setUserId(Long.valueOf(i));
34             createOrderEvent.setCreateTime(new Date());
35             createOrderEvent.setEventType("create_order");
36             //转成JSON
37             String msg = new Gson().toJson(createOrderEvent);
38
39             System.out.println("send msg:" + msg);
40
41             //使用order_event路由键来发送该事件消息
42             channel.basicPublish(EXCHANGE_NAME, "order_event", null, msg.getBytes());
43
44             Thread.sleep(1000);
45         }
46
47         channel.close();
48         connection.close();
49     }
50 }


消费消费者ReceiverWithFanoutExchange

1 package com.yzl.test3;
2
3 import java.io.IOException;
4
5 import com.rabbitmq.client.Channel;
6 import com.rabbitmq.client.Connection;
7 import com.rabbitmq.client.ConnectionFactory;
8 import com.rabbitmq.client.DefaultConsumer;
9 import com.rabbitmq.client.Envelope;
10 import com.rabbitmq.client.AMQP.BasicProperties;
11
12 /**
13  * 使用fanout交换器接收订单事件消息
14  *
15  * @author: yzl
16  * @date: 2016-10-22
17  */
18 public class ReceiverWithFanoutExchange {
19     // 交换器名称
20     private static final String EXCHANGE_NAME = "myFanoutExchange";
21     //接收订单事件并发放积分的队列
22     private static final String QUEUE_ORDER_REWARD_POINTS = "rewardOrderPoints";
23     //发放订单积分的路由键
24     private static final String ROUTING_KEY_ORDER_POINTS = "reward_order_points";
25     //接收订单事件并发短信的队列
26     private static final String QUEUE_ORDER_SEND_SMS = "sendOrderSms";
27     //发送订单短信的路由键
28     private static final String ROUTING_KEY_ORDER_SMS = "send_order_sms";
29
30     private static Channel channel = null;
31
32     static{
33         try{
34             // 连接到rabbitmq服务器
35             ConnectionFactory factory = new ConnectionFactory();
36             factory.setHost("localhost");
37             Connection connection = factory.newConnection();
38             // 创建一个信道
39             channel = connection.createChannel();
40             // 定义一个名字为myFanoutExchange的fanout类型的exchange
41             channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
42         }catch (Exception e) {
43             // TODO: handle exception
44         }
45     }
46
47     /**
48      * 发放订单的积分奖励
49      */
50     public static void rewardPoints() throws Exception {
51         channel.queueDeclare(QUEUE_ORDER_REWARD_POINTS, false, false, false, null);
52         channel.queueBind(QUEUE_ORDER_REWARD_POINTS, EXCHANGE_NAME, ROUTING_KEY_ORDER_POINTS);
53
54         channel.basicConsume(QUEUE_ORDER_REWARD_POINTS, true, new DefaultConsumer(channel){
55             @Override
56             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
57                     throws IOException {
58                 String msg = new String(body);
59                 System.out.println("积分系统接收到订单创建的事件消息 :" + msg);
60                 System.out.println("准备发放积分.....");
61             }
62         });
63     }
64
65     /**
66      * 发送订单成功的短信
67      */
68     public static void sendSms() throws Exception {
69         channel.queueDeclare(QUEUE_ORDER_SEND_SMS, false, false, false, null);
70         channel.queueBind(QUEUE_ORDER_SEND_SMS, EXCHANGE_NAME, ROUTING_KEY_ORDER_SMS);
71
72         channel.basicConsume(QUEUE_ORDER_REWARD_POINTS, true, new DefaultConsumer(channel){
73             @Override
74             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
75                     throws IOException {
76                 String msg = new String(body);
77                 System.out.println("短信平台接收到订单创建的事件消息 :" + msg);
78                 System.out.println("准备发送短信.....");
79             }
80         });
81     }
82
83     public static void main(String[] args) throws Exception {
84         rewardPoints();
85         sendSms();
86     }
87 }


运行结果输出:

1 send msg:{"userId":1,"createTime":"Oct 22, 2016 10:54:04 PM","eventType":"create_order"}
2 send msg:{"userId":2,"createTime":"Oct 22, 2016 10:54:05 PM","eventType":"create_order"}
3 send msg:{"userId":3,"createTime":"Oct 22, 2016 10:54:06 PM","eventType":"create_order"}
4 send msg:{"userId":4,"createTime":"Oct 22, 2016 10:54:07 PM","eventType":"create_order"}
5 send msg:{"userId":5,"createTime":"Oct 22, 2016 10:54:08 PM","eventType":"create_order"}
6 send msg:{"userId":6,"createTime":"Oct 22, 2016 10:54:09 PM","eventType":"create_order"}
7 send msg:{"userId":7,"createTime":"Oct 22, 2016 10:54:10 PM","eventType":"create_order"}
8 send msg:{"userId":8,"createTime":"Oct 22, 2016 10:54:11 PM","eventType":"create_order"}
9 send msg:{"userId":9,"createTime":"Oct 22, 2016 10:54:12 PM","eventType":"create_order"}


1 积分系统接收到订单创建的事件消息 :{"userId":1,"createTime":"Oct 22, 2016 10:54:04 PM","eventType":"create_order"}
2 准备发放积分.....
3 短信平台接收到订单创建的事件消息 :{"userId":2,"createTime":"Oct 22, 2016 10:54:05 PM","eventType":"create_order"}
4 准备发送短信.....
5 积分系统接收到订单创建的事件消息 :{"userId":3,"createTime":"Oct 22, 2016 10:54:06 PM","eventType":"create_order"}
6 准备发放积分.....
7 短信平台接收到订单创建的事件消息 :{"userId":4,"createTime":"Oct 22, 2016 10:54:07 PM","eventType":"create_order"}
8 准备发送短信.....
9 积分系统接收到订单创建的事件消息 :{"userId":5,"createTime":"Oct 22, 2016 10:54:08 PM","eventType":"create_order"}
10 准备发放积分.....
11 短信平台接收到订单创建的事件消息 :{"userId":6,"createTime":"Oct 22, 2016 10:54:09 PM","eventType":"create_order"}
12 准备发送短信.....
13 积分系统接收到订单创建的事件消息 :{"userId":7,"createTime":"Oct 22, 2016 10:54:10 PM","eventType":"create_order"}
14 准备发放积分.....
15 短信平台接收到订单创建的事件消息 :{"userId":8,"createTime":"Oct 22, 2016 10:54:11 PM","eventType":"create_order"}
16 准备发送短信.....
17 积分系统接收到订单创建的事件消息 :{"userId":9,"createTime":"Oct 22, 2016 10:54:12 PM","eventType":"create_order"}
18 准备发放积分.....
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: