您的位置:首页 > 其它

五.使用场景案例

2017-04-16 03:50 176 查看
RockerMQ—console的使用

⦁ 首页





⦁ Producer消息生产的topic查询:stat统计、route通道

之前我机器使用的是主机名:apple,没用ip





统计





通道





可通过RockerMQ—console管理界面查看broker配置信息






DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer");
consumer. setNamesrvAddr ("127.0.0.1:9876");
consumer. setInstanceName("QuickStartConsumer");
consumer. subscribe ("TopicModel", "*");






1、通过new DefaultMQPushConsumer("QuickStartConsumer");的实例查看消费记录








2、实例

消费者如何接受特定的消息:以下案例是描述MQ消费者,如果根据Tag在MQ完成消息过滤。Tag消息的标签,消息类型,用来区分某个MQ的topic消息分类,MQ允许消费者按照Tag进行消息过滤,确保用户消费者最终只消费他关心、特定的消息类型。





案例一、接收订单下所有的信息

    /**
* 消费方式-1 消费者如需订阅某Topic下所有类型的消息,Tag用 * 符号表示:
* @param args
* @throws InterruptedException
* @throws MQClientException
*/
public static void main1(String [] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer"); consumer. setNamesrvAddr ("127.0.0.1:9876"); consumer. setInstanceName("QuickStartConsumer"); consumer. subscribe ("TopicModel", "*");
consumer. registerMessageListener (new MessageListenerConcurrently () {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " 接收消息: " + msgs.size());
MessageExt msg = msgs.get (0);
/** Consumer对象在使用之前必须要调用start初始化,初始化一次即可。*/
consumer. start ();
System.out.println("Consumer Started");
}


案例二、只关注订单信息

/**
* 消费者如需订阅某Topic下特定类型的消息!
* @param args
* @throws InterruptedException
* @throws MQClientException
*/
public static void main3(String [] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer");
consumer. setNamesrvAddr ("127.0.0.1:9876");
consumer. setInstanceName("QuickStartConsumer");
// 请明确标明Tag:只关注自己需要的!
consumer. subscribe ("TopicModel", "Pay");
consumer. registerMessageListener (new MessageListenerConcurrently () {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " 接收消息: " + msgs.size());
MessageExt msg = msgs.get (0);
/*** 对topic tag验证:只关注特定Pay*/
if (msg. getTopic (). equals("TopicModel") && msg. getTags (). equals("Pay")) {
System.out.print("特定类型:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer. start ();
System.out.println("Consumer Started");
}


案例三、只接收订单、购物信息

/**
* 消费者如需订阅某Topic下多种"标签"类型的消息!
* @param args
* @throws InterruptedException
* @throws MQClientException
*/
public static void main (String [] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer");
consumer. setNamesrvAddr ("127.0.0.1:9876");
consumer. setInstanceName("QuickStartConsumer");
// 请明确标明Tag:多个Tag之间用 || 分隔
consumer. subscribe ("TopicModel", "Shoppong||Pay");
consumer. registerMessageListener (new MessageListenerConcurrently () {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " 接收消息: " + msgs.size());
MessageExt msg = msgs.get (0);
/** 对topic tag验证,只关注自己的标签*/
System.out.println(new String (msg. getBody ()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer. start ();
System.out.println("Consumer Started");
}


案例四、(错误示例)同一个消费者多次订阅某Topic下的不同Tag,后者会覆盖前者

/**
* 消费者如需订阅某Topic下多种"标签"类型的消息!
* @param args
* @throws InterruptedException
* @throws MQClientException
*/
public static void main (String [] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer");
consumer. setNamesrvAddr ("127.0.0.1:9876");
consumer. setInstanceName("QuickStartConsumer");
// 请明确标明Tag:多个Tag之间用 || 分隔

说明描述:consumer. subscribe ("TopicModel", "Pay"
);
错误:consumer. subscribe ("TopicModel", "Shoppong");
说明描述:consumer只能接收TopicModel到Shoppong购物信息,不能接收到Pay订单信息,后者会覆盖前者。
consumer. registerMessageListener (new MessageListenerConcurrently () {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " 接收消息: " + msgs.size());
MessageExt msg = msgs.get (0);
System.out.println(new String (msg. getBody ()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer. start ();
System.out.println("Consumer Started");
}


⦁ 一个生产者向消息队列中发送一条消息,有多个消息者,如何保证此消息只能被消费一次:消费者A提取了信息,消费者B则不能获取信息。

集群:在客户端设置consumer. setMessageModel (MessageModel.CLUSTERING)
测试使用“推送”消息DefaultMQPushConsumer,消息是被平均消费的,每个消费在启动时已经分配好读取的队列了,所以不会产生重复消费的,不存在a消费了,b还能消费,有多少个用户订阅某个topci的信息,消息生产者就是推送多少条!每个用户一条,平均分配到不同的队列,一旦被消费就没有了。
测试1:模拟单个:消息生产者发送一条消息,代码则修改消息生产为发送一条

消费者A









描述:B直接消费不了,获取不到队列

测试2:模拟多个:消息生产者发送4条消息

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;

/**
* 消息产生者Producer
*
* @author admin
*
*/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
/**
* 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>
* 注意:ProducerGroupName需要由应用来保证唯一<br>
* ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
* 因为服务器会回查这个Group下的任意一个Producer
*/
DefaultMQProducer producer = new DefaultMQProducer("QuickStartProducer");
/**
* 1、负责查找NameServer,多个NameServer地址用分号隔开
*/
producer.setNamesrvAddr("127.0.0.1:9876");
/**
* 2、客户端实例名称(这个实例包含网络连接、线程资源等)
*/
producer.setInstanceName("QuickStartProducert");
/**
* 3、Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();
/**
* 这里模拟了 4 次消息发送,一个Producer对象可以发送多个topic,多个tag的消息。
* 发生消息时必须声明区分Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());
*/
for (int I = 1; I <=4; I++) {
try {
Message msg = new Message("TopicModel",
"TagB",
"OrderID002",
("Message..." + I).getBytes()
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
/**
* 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
* 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
*/
producer.shutdown();
}
}






输出结果msg.getMsgId ()不一样,每个用户只消费一条,不可被重复消费。
广播:consumer. setMessageModel (MessageModel.BROADCASTING)
暂不推荐…

测试3: 获取对象的信息,并推送json格式的消息

public static void main (String [] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer. setNamesrvAddr ("127.0.0.1:9876");
producer. setInstanceName("Producer");
stuinfo o = new stuinfo("小明","15074937147");
//1、获取对象集合
List<stuinfo> list = new ArrayList<stuinfo>();
list.add(o);
//2、使用序列化
JSONArray obj = (JSONArray) JSONArray.toJSON(list);
JSONObject jsonObject = new JSONObject ();
//3、转换json
jsonObject.put ("key", obj);
producer. start ();
try {
Message msg = new Message("JSONtopic”, (jsonObject.getString("key")).getBytes());
SendResult sendResult = producer. send(msg);
System.out.println(sendResult);
} catch (RemotingException | MQBrokerException e) {
e. printStackTrace ();
}
TimeUnit.MILLISECONDS. sleep (1000);
/**
* 调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
*/
producer. shutdown ();
System.out.println("\nproducer========>运行...\n");
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: