您的位置:首页 > 编程语言 > Java开发

MetaQ源码阅读及与Spring结合使用

2017-07-29 13:32 471 查看
MetaQ (全称 Metamorphosis )是一个高性能、高可用、可扩展的分布式消息中间件 ,MetaQ 具有消息存储顺序写、吞吐量大和支持本地和XA 事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景, MetaQ在阿里巴巴各个子公司被广泛应用,每天转发 250 亿 + 条消息。主要应用于异步解耦, Mysql 数据复制,收集日志等场景 。。

我是做移动互联网广告系统的,在工作中有很多场景使用到了MetaQ,例如:广告的存储、效果数据的上报,多机房扣费等都需要依赖MetaQ,由于公司已经使用MeatQ作为消息中间件的时间很久了,已经有了模板,所以很多的时候就是直接拿来使用,对里面为什么做这样那样的封装没有去深入的了解,刚好这段时间有空就去看了看源码,给自己总结沉淀一下,做到不仅知道怎么用,还要知道为什么这样做。

一、生产者

发送消息是由生产者MessageProduce触发,MessageProduce从MessageSessionFactory中创建出来具体实现如下:

MetaClientConfig metaClientConfig = new MetaClientConfig();
ZKConfig zkConfig = new ZkUtils.ZKConfig()
zkConfig.zkConnect = "127.0.0.1:2181";
metaClientConfig.setZkConfig(zkConfig)
MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig );
// create producer,强烈建议使用单例
MessageProducer producer = sessionFactory.createProducer();
// publish topic
final String topic = "meta-test";
producer.publish(topic);
SendResult sendResult = producer.sendMessage(new Message(topic, "xxxx".getBytes()));


我们可以看出MessageProduce 是通过工厂创建的 ,MetaMessageSessionFactory需要一个参数就MetaClientConfig这个类,MetaClientConfig里面是什么了?MetaClientConfig中有个工具类ZkUtils,通过名字就知道是和zk交互的类,

和zk交互我们知道创建一个客户端需要几个参数:

1. zkConnect (zk的ip地址)

2. zkSessionTimeoutMs(zk的会话超时时间)

3. zkConnectionTimeoutMs(zk的连接超时时间)

4. zkSyncTimeMs(zk心跳时间)

我们知道Sping IOC容器就是用来创建发现维护类与类之间的关系的,MetaQ团队当然也想到了这个,那他是这么实现的呢?

在com.taobao.metamorphosis.client.extension.spring 中有如下几个类:

1. AbstractMetaqMessageSessionFactory

2. DefaultMessageListener

3. JavaSerializationMessageBodyConverter

4. MessageBodyConverter

5. MessageBuilder

6. MessageListenerContainer

7. MetaqMessage

8. MetaqMessageSessionFactoryBean

9. MetaqTemplate

10. MetaqTopic

11. XAMetaqMessageSessionFactoryBean

先来看看MetaqTemplate这个类,这个类提供发送消息的方法,

public SendResult send(MessageBuilder builder) throws InterruptedException {
Message msg = builder.build(this.messageBodyConverter);
final String topic = msg.getTopic();
MessageProducer producer = this.getOrCreateProducer(topic);
try {
return producer.sendMessage(msg);
}
catch (MetaClientException e) {
return new SendResult(false, null, -1, ExceptionUtils.getFullStackTrace(e));
}
}


我们发现使用send方法的时候还要MessageBodyConverter的类,这个类是用来做什么的呢?:

/**
* Convert a message object to byte array.
*
* @param body
* @return
* @throws MetaClientException
*/
public byte[] toByteArray(T body) throws MetaClientException;

/**
* Convert a byte array to message object.
*
* @param bs
* @return
* @throws MetaClientException
*/
public T fromByteArray(byte[] bs) throws MetaClientException;


可以看到这里定义了两个方法 用来把消息转换为二进制,以及从二进制中恢复消息,我们知道数据在网络上传输都是二进制的方式进行传输的,这个接口很方便我们做扩展,灵活的实现自己的转换规则,比如采用其他序列化协议,如protobufs,hessian等等,当然如果你不想实现自己的消息转换类,这里提供了一个实现类:JavaSerializationMessageBodyConverter

public class JavaSerializationMessageBodyConverter implements MessageBodyConverter<Serializable> {
JavaSerializer serializer = new JavaSerializer();
JavaDeserializer deserializer = new JavaDeserializer();

@Override
public byte[] toByteArray(Serializable body) throws MetaClientException {
try {
return this.serializer.encodeObject(body);
}
catch (IOException e) {
throw new MetaClientException(e);

}
}

@Override
public Serializable fromByteArray(byte[] bs) throws MetaClientException {
try {
return (Serializable) this.deserializer.decodeObject(bs);
}
catch (IOException e) {
throw new MetaClientException(e);

}
}

}


JavaSerializationMessageBodyConverter 实现了MessageBodyConverter ,对消息体进行序列化和反序列化。

send方法中还调用了getOrCreateProducer我们来看看这个方法:

public MessageProducer getOrCreateProducer(final String topic) {
if (!this.shareProducer) {
FutureTask<MessageProducer> task = this.producers.get(topic);
if (task == null) {
task = new FutureTask<MessageProducer>(new Callable<MessageProducer>() {

@Override
public MessageProducer call() throws Exception {
MessageProducer producer = MetaqTemplate.this.messageSessionFactory.createProducer();
producer.publish(topic);
if (!StringUtils.isBlank(MetaqTemplate.this.defaultTopic)) {
producer.setDefaultTopic(MetaqTemplate.this.defaultTopic);
}
return producer;
}

});
FutureTask<MessageProducer> oldTask = this.producers.putIfAbsent(topic, task);
if (oldTask != null) {
task = oldTask;
}
else {
task.run();
}
}

try {
MessageProducer producer = task.get();
return producer;
}
catch (ExecutionException e) {
throw ThreadUtils.launderThrowable(e.getCause());
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
else {
if (this.sharedProducer == null) {
synchronized (this) {
if (this.sharedProducer == null) {
this.sharedProducer = this.messageSessionFactory.createProducer();
if (!StringUtils.isBlank(this.defaultTopic)) {
this.sharedProducer.setDefaultTopic(this.defaultTopic);
}
}
}
}
this.sharedProducer.publish(topic);
return this.sharedProducer;
}
throw new IllegalStateException("Could not create producer for topic '" + topic + "'");
}


看到熟悉的 messageSessionFactory ,创建生产者的时候就需要这个工厂类来创建,我们在回过头来看看MetaqTemplate这个类:

private MessageSessionFactory messageSessionFactory;
private String defaultTopic;
private MessageBodyConverter<?> messageBodyConverter;
private boolean shareProducer = false;
private volatile MessageProducer sharedProducer;


有好几个属性,我们只要传入一个MessageSessionFactory 及MessageBodyConverter对象即可:

到此我们就可以创建MetaqTemplate这个类了:

先来创建MessageSessionFactory ,这里使用MetaqMessageSessionFactoryBean这个实现类:

<!--  message session factory -->
<bean id="sessionFactory" class="com.taobao.metamorphosis.client.extension.spring.MetaqMessageSessionFactoryBean">
<property name="zkConnect" value="127.0.0.1:2181"/>
<property name="zkSessionTimeoutMs" value="30000"/>
<property name="zkConnectionTimeoutMs" value="30000"/>
<property name="zkSyncTimeMs" value="5000"/>
</bean>


这样我们就创建了一个工厂类了,然后我们需要创建一个消息转换类,这里使用默认实现类:

JavaSerializationMessageBodyConverter

<!--  message body converter using java serialization. -->
<bean id="messageBodyConverter"
class="com.taobao.metamorphosis.client.extension.spring.JavaSerializationMessageBodyConverter"/>


我们需要创建MetaqTemplate元素都准备好了,可以创建MetaqTemplate类了:

<!--  template to send messages. -->
<bean id ="metaqTemplate" class="com.taobao.metamorphosis.client.extension.spring.MetaqTemplate">
<property name="messageSessionFactory" ref="sessionFactory"/>
<property name="messageBodyConverter" ref="messageBodyConverter"/>
</bean>


可以发送消息了:

final String topic = "date";
final SendResult sendResult =
template.send(MessageBuilder.withTopic(topic).withBody(new Date());


二、消费者

看完生产者我在来看看消费者,接受消息是由消费者MessageConsume触发,MessageConsume从MessageSessionFactory中创建出来具体实现如下:

MetaClientConfig metaClientConfig = new MetaClientConfig();
ZKConfig zkConfig = new ZkUtils.ZKConfig()
zkConfig.zkConnect = "127.0.0.1:2181";
metaClientConfig.setZkConfig(zkConfig)
MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig );
// subscribed topic
final String topic = "meta-test";
// consumer group
final String group = "meta-example";
// create consumer,强烈建议使用单例
MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));
// subscribe topic
consumer.subscribe(topic, 1024 * 1024, new MessageListener() {

public void recieveMessages(Message message) {
System.out.println("Receive message " + new String(message.getData()));
}

public Executor getExecutor() {
// Thread pool to process messages,maybe null.
return null;
}
});
// complete subscribe
consumer.completeSubscribe();

}


消费者也是通过MetaMessageSessionFactory 去创建的,然后调用subscribe 实现消息的订阅接受及处理,我们来看看这个类MessageListenerContainer:

@Override
public void afterPropertiesSet() throws Exception {
log.info("Start to initialize message listener container.");
if (this.subscribers != null) {
Set<MessageConsumer> consumers = new HashSet<MessageConsumer>();
for (Map.Entry<MetaqTopic, ? extends DefaultMessageListener<?>> entry : this.subscribers.entrySet()) {
final MetaqTopic topic = entry.getKey();
final DefaultMessageListener<?> listener = entry.getValue();
if (topic == null) {
throw new IllegalArgumentException("Topic is null");
}
if (StringUtils.isBlank(topic.getTopic())) {
throw new IllegalArgumentException("Blank topic");
}
MessageConsumer consumer = this.getMessageConsumer(topic);
if (consumer == null) {
throw new IllegalStateException("Get or create consumer failed");
}
log.info("Subscribe topic=" + topic.getTopic() + " with group=" + topic.getGroup());
if (listener.getMessageBodyConverter() == null) {
listener.setMessageBodyConverter(this.messageBodyConverter);
}
consumer.subscribe(topic.getTopic(), topic.getMaxBufferSize(), listener);
consumers.add(consumer);
}
for (MessageConsumer consumer : consumers) {
consumer.completeSubscribe();
}
}
log.info("Initialize message listener container successfully.");
}


可以看到这个类在初始完成后会创建一个消费者,然后调用消费者的subscribe方法订阅和处理消息,创建这个类需要下面这个几个类:MetaqTopic 、DefaultMessageListener或者其子类,下面我来分别看看这个两个类:

MetaqTopic 主要有如下几个属性:

private ConsumerConfig consumerConfig = new ConsumerConfig();
private String topic;
private int maxBufferSize = 1024 * 1024;


我们知道创建消费者的时候需要指定topic及每次消费的大小,MetaqTopic 这个就是用来指定这些属性值的

再来看看 DefaultMessageListener:

@Override
public void recieveMessages(Message message) throws InterruptedException {
if (this.messageBodyConverter != null) {
try {
T body = (T) this.messageBodyConverter.fromByteArray(message.getData());
this.onReceiveMessages(new MetaqMessage<T>(message, body));
}
catch (Exception e) {
log.error("Convert message body from byte array failed,msg id is " + message.getId() + " and topic is "
+ message.getTopic(), e);
message.setRollbackOnly();
}
}
else {
this.onReceiveMessages(new MetaqMessage<T>(message, null));
}
}

public abstract void onReceiveMessages(MetaqMessage<T> msg);


这个类实现了recieveMessages处理消息的方法,在方法中我们调用了MessageBodyConverter 这个类转换消息,然后调用了onReceiveMessages 这个方法,需要我们自己来实现真正的消息处理,也就是我们需要实现DefaultMessageListener这个类中onReceiveMessages 方法来处理消息就可以了。

这里简单进行一个实现:

import com.taobao.metamorphosis.client.extension.spring.DefaultMessageListener;
import com.taobao.metamorphosis.client.extension.spring.MetaqMessage;
import java.util.Date;

/**
* Process date messages listener.
*
* @author dennis
*
*/
public class DateMessageListener extends DefaultMessageListener<Date> {

@Override
public void onReceiveMessages(MetaqMessage<Date> msg) {
Date date = msg.getBody();
System.out.println("receive date message:" + date);
}

}


这样我们所需的要素就都有了,现在看看怎么用spring来配置:

<!--  topics to be subscribed. -->
<bean id = "dateTopic" class="com.taobao.metamorphosis.client.extension.spring.MetaqTopic">
<!-- consumer group -->
<property name="group" value="testGroup"/>
<!--  topic -->
<property name="topic" value="date"/>
<!--  max buffer size to fetch messages -->
<property name="maxBufferSize" value="16384"/>
</bean>


<!--  message listener -->
<bean id= "messageListener" class="com.taobao.metamorphosis.example.spring.DateMessageListener">
<!--  threads to process these messages. -->
<property name="processThreads" value="10"/>
</bean>


<!--  listener container to subscribe topics -->
<bean id ="listenerContainer" class="com.taobao.metamorphosis.client.extension.spring.MessageListenerContainer">
<property name="messageSessionFactory" ref="sessionFactory"/>
<property name="messageBodyConverter" ref="messageBodyConverter"/>
<property name="subscribers">
<map>
<entry key-ref="dateTopic" value-ref="messageListener"/>
</map>
</property>
</bean>


只要配置好这些后就可以通过我们实现的监听器DateMessageListener 自动处理消息了。

写到这差不多就整理完了,代码比较多,只找了几个关键的地方进行分析,着重点落在了这么结合Spring使用。由于能力有限制,写到不到位的地方多多见谅。。

我是一只小蜗牛,虽然速度慢,但我一直在努力向前爬。。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  MetaQ spring