五.使用场景案例
2017-04-16 03:50
176 查看
RockerMQ—console的使用
⦁ 首页
⦁ Producer消息生产的topic查询:stat统计、route通道
之前我机器使用的是主机名:apple,没用ip
统计
通道
可通过RockerMQ—console管理界面查看broker配置信息
1、通过new DefaultMQPushConsumer("QuickStartConsumer");的实例查看消费记录
2、实例
消费者如何接受特定的消息:以下案例是描述MQ消费者,如果根据Tag在MQ完成消息过滤。Tag消息的标签,消息类型,用来区分某个MQ的topic消息分类,MQ允许消费者按照Tag进行消息过滤,确保用户消费者最终只消费他关心、特定的消息类型。
案例一、接收订单下所有的信息
案例二、只关注订单信息
案例三、只接收订单、购物信息
案例四、(错误示例)同一个消费者多次订阅某Topic下的不同Tag,后者会覆盖前者
说明描述:consumer. subscribe ("TopicModel", "Pay"
⦁ 一个生产者向消息队列中发送一条消息,有多个消息者,如何保证此消息只能被消费一次:消费者A提取了信息,消费者B则不能获取信息。
集群:在客户端设置consumer. setMessageModel (MessageModel.CLUSTERING)
测试使用“推送”消息DefaultMQPushConsumer,消息是被平均消费的,每个消费在启动时已经分配好读取的队列了,所以不会产生重复消费的,不存在a消费了,b还能消费,有多少个用户订阅某个topci的信息,消息生产者就是推送多少条!每个用户一条,平均分配到不同的队列,一旦被消费就没有了。
测试1:模拟单个:消息生产者发送一条消息,代码则修改消息生产为发送一条
消费者A
描述:B直接消费不了,获取不到队列
测试2:模拟多个:消息生产者发送4条消息
输出结果msg.getMsgId ()不一样,每个用户只消费一条,不可被重复消费。
广播:consumer. setMessageModel (MessageModel.BROADCASTING)
暂不推荐…
测试3: 获取对象的信息,并推送json格式的消息
⦁ 首页
⦁ Producer消息生产的topic查询:stat统计、route通道
之前我机器使用的是主机名:apple,没用ip
统计
通道
可通过RockerMQ—console管理界面查看broker配置信息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer"); consumer. setNamesrvAddr ("127.0.0.1:9876"); consumer. setInstanceName("QuickStartConsumer"); consumer. subscribe ("TopicModel", "*");
1、通过new DefaultMQPushConsumer("QuickStartConsumer");的实例查看消费记录
2、实例
消费者如何接受特定的消息:以下案例是描述MQ消费者,如果根据Tag在MQ完成消息过滤。Tag消息的标签,消息类型,用来区分某个MQ的topic消息分类,MQ允许消费者按照Tag进行消息过滤,确保用户消费者最终只消费他关心、特定的消息类型。
案例一、接收订单下所有的信息
/**
* 消费方式-1 消费者如需订阅某Topic下所有类型的消息,Tag用 * 符号表示:
* @param args
* @throws InterruptedException
* @throws MQClientException
*/
public static void main1(String [] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer"); consumer. setNamesrvAddr ("127.0.0.1:9876"); consumer. setInstanceName("QuickStartConsumer"); consumer. subscribe ("TopicModel", "*");
consumer. registerMessageListener (new MessageListenerConcurrently () {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " 接收消息: " + msgs.size());
MessageExt msg = msgs.get (0);
/** Consumer对象在使用之前必须要调用start初始化,初始化一次即可。*/
consumer. start ();
System.out.println("Consumer Started");
}
案例二、只关注订单信息
/** * 消费者如需订阅某Topic下特定类型的消息! * @param args * @throws InterruptedException * @throws MQClientException */ public static void main3(String [] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer"); consumer. setNamesrvAddr ("127.0.0.1:9876"); consumer. setInstanceName("QuickStartConsumer"); // 请明确标明Tag:只关注自己需要的! consumer. subscribe ("TopicModel", "Pay"); consumer. registerMessageListener (new MessageListenerConcurrently () { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " 接收消息: " + msgs.size()); MessageExt msg = msgs.get (0); /*** 对topic tag验证:只关注特定Pay*/ if (msg. getTopic (). equals("TopicModel") && msg. getTags (). equals("Pay")) { System.out.print("特定类型:" + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer. start (); System.out.println("Consumer Started"); }
案例三、只接收订单、购物信息
/** * 消费者如需订阅某Topic下多种"标签"类型的消息! * @param args * @throws InterruptedException * @throws MQClientException */ public static void main (String [] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer"); consumer. setNamesrvAddr ("127.0.0.1:9876"); consumer. setInstanceName("QuickStartConsumer"); // 请明确标明Tag:多个Tag之间用 || 分隔 consumer. subscribe ("TopicModel", "Shoppong||Pay"); consumer. registerMessageListener (new MessageListenerConcurrently () { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " 接收消息: " + msgs.size()); MessageExt msg = msgs.get (0); /** 对topic tag验证,只关注自己的标签*/ System.out.println(new String (msg. getBody ())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer. start (); System.out.println("Consumer Started"); }
案例四、(错误示例)同一个消费者多次订阅某Topic下的不同Tag,后者会覆盖前者
/** * 消费者如需订阅某Topic下多种"标签"类型的消息! * @param args * @throws InterruptedException * @throws MQClientException */ public static void main (String [] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer"); consumer. setNamesrvAddr ("127.0.0.1:9876"); consumer. setInstanceName("QuickStartConsumer"); // 请明确标明Tag:多个Tag之间用 || 分隔
说明描述:consumer. subscribe ("TopicModel", "Pay"
); 错误:consumer. subscribe ("TopicModel", "Shoppong"); 说明描述:consumer只能接收TopicModel到Shoppong购物信息,不能接收到Pay订单信息,后者会覆盖前者。 consumer. registerMessageListener (new MessageListenerConcurrently () { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " 接收消息: " + msgs.size()); MessageExt msg = msgs.get (0); System.out.println(new String (msg. getBody ())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer. start (); System.out.println("Consumer Started"); }
⦁ 一个生产者向消息队列中发送一条消息,有多个消息者,如何保证此消息只能被消费一次:消费者A提取了信息,消费者B则不能获取信息。
集群:在客户端设置consumer. setMessageModel (MessageModel.CLUSTERING)
测试使用“推送”消息DefaultMQPushConsumer,消息是被平均消费的,每个消费在启动时已经分配好读取的队列了,所以不会产生重复消费的,不存在a消费了,b还能消费,有多少个用户订阅某个topci的信息,消息生产者就是推送多少条!每个用户一条,平均分配到不同的队列,一旦被消费就没有了。
测试1:模拟单个:消息生产者发送一条消息,代码则修改消息生产为发送一条
消费者A
描述:B直接消费不了,获取不到队列
测试2:模拟多个:消息生产者发送4条消息
import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; /** * 消息产生者Producer * * @author admin * */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { /** * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br> * 注意:ProducerGroupName需要由应用来保证唯一<br> * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键, * 因为服务器会回查这个Group下的任意一个Producer */ DefaultMQProducer producer = new DefaultMQProducer("QuickStartProducer"); /** * 1、负责查找NameServer,多个NameServer地址用分号隔开 */ producer.setNamesrvAddr("127.0.0.1:9876"); /** * 2、客户端实例名称(这个实例包含网络连接、线程资源等) */ producer.setInstanceName("QuickStartProducert"); /** * 3、Producer对象在使用之前必须要调用start初始化,初始化一次即可<br> * 注意:切记不可以在每次发送消息时,都调用start方法 */ producer.start(); /** * 这里模拟了 4 次消息发送,一个Producer对象可以发送多个topic,多个tag的消息。 * 发生消息时必须声明区分Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes()); */ for (int I = 1; I <=4; I++) { try { Message msg = new Message("TopicModel", "TagB", "OrderID002", ("Message..." + I).getBytes() ); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } /** * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己 * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法 */ producer.shutdown(); } }
输出结果msg.getMsgId ()不一样,每个用户只消费一条,不可被重复消费。
广播:consumer. setMessageModel (MessageModel.BROADCASTING)
暂不推荐…
测试3: 获取对象的信息,并推送json格式的消息
public static void main (String [] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer. setNamesrvAddr ("127.0.0.1:9876"); producer. setInstanceName("Producer"); stuinfo o = new stuinfo("小明","15074937147"); //1、获取对象集合 List<stuinfo> list = new ArrayList<stuinfo>(); list.add(o); //2、使用序列化 JSONArray obj = (JSONArray) JSONArray.toJSON(list); JSONObject jsonObject = new JSONObject (); //3、转换json jsonObject.put ("key", obj); producer. start (); try { Message msg = new Message("JSONtopic”, (jsonObject.getString("key")).getBytes()); SendResult sendResult = producer. send(msg); System.out.println(sendResult); } catch (RemotingException | MQBrokerException e) { e. printStackTrace (); } TimeUnit.MILLISECONDS. sleep (1000); /** * 调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己 */ producer. shutdown (); System.out.println("\nproducer========>运行...\n"); }
相关文章推荐
- 6.HBase In Action 第一章-HBase简介(1.2 HBase的使用场景和成功案例)
- HBase使用场景和成功案例
- NoSQL初探之人人都爱Redis:(3)使用Redis作为消息队列服务场景应用案例
- 深入MySQL用户自定义变量:使用详解及其使用场景案例
- HBase 使用场景和成功案例
- HBase使用场景和成功案例 (转)
- Linux实战案例(2)实例讲解使用软连接的场景和过程
- NoSQL初探之人人都爱Redis:(3)使用Redis作为消息队列服务场景应用案例
- 第4周 Redis使用场景与案例分析
- linux dd命令详解及使用案例场景
- HBase使用场景和成功案例
- HBase 使用场景和成功案例
- HBase使用场景和成功案例 (转)
- NoSQL初探之人人都爱Redis:(3)使用Redis作为消息队列服务场景应用案例
- IBM朱辉:大数据分析的5个高复制使用场景及案例分享(含PPT)
- HBase使用场景和成功案例
- NoSQL初探之人人都爱Redis:(3)使用Redis作为消息队列服务场景应用案例
- HBase使用场景和成功案例
- HBase使用场景和成功案例 (转)
- 使用Redis作为消息队列服务场景应用案例