您的位置:首页 > 其它

amq 源码分析之demo分析1

2013-03-23 10:16 375 查看

1:构建activemq源码

这里采用的是maven构建的。这里不作为重点了。

2:以marketdata publisher为例分析

我们这里就以market data publisher 作为整个程序的入口开始进行分析。

整个连接主要是讲请求发送到PortfolioPublishServlet 这个servlet来进行处理。该servlet还是比较简单的。就是通过webclient来发送消息。

protected
void
sendMessage(WebClient client, String[] stocks) throwsJMSException {

Session session = client.getSession();

int idx= 0;

while (true) {

idx = (int)Math.round(stocks.length *Math.random());

if (idx< stocks.length) {

break;

}

}

Stringstock = stocks[idx];

Destination destination = session.createTopic("STOCKS." +stock);

StringstockText = createStockText(stock);

log("Sending:" + stockText +
" on destination: " +destination);

Message message = session.createTextMessage(stockText);

client.send(destination, message);

}

在这里我们可以看到,主要是调用的webclient的send方法来进行消息的发送。看来,在这个应用上,webclient应该是核心的类了。

3:webclient的初始化

那么,我们就进入webclient,看看他是如何进行初始化的。

ProtfolioPublishServlet是继承自MessageServletSupport的。Webclient正是在这个servlet中进行初始化的。

WebClient.initContext(getServletContext())中的initConnectionFactory 方法从字面上理解就是初始化连接工厂啦。

String brokerURL = servletContext.getInitParameter(BROKER_URL_INIT_PARAM);

这里得到的brokerURL是vm://amq-broker。Broker就是JMS的中心啦。VM的意思就是虚拟机内通讯。AMQ支持多种通讯协议。我们最常用的其实是TCP协议。采用VM是如果多个应用如果运行在一个JVM上,那么就没有必要采用网络通讯了。直接在VM内部通讯就可以。这里算是一种优化吧。关于TCP后面会分析。这里就先看VM。

ActiveMQConnectionFactory amqfactory = new ActiveMQConnectionFactory(brokerURL);

链接工厂类,他实现了QueueConnectionFactory 接口和TopicConnectionFactory 。所以,他可以构建 QueueConnections 和 TopicConnections.

在JMS规范中,有两种消息模型,一种是点对点模型,也就是ptp。还有一种是发布订阅模型。

点对点模型:



发布订阅模型:



这两种模型我们会在后面也会详细的进行分析。

生成链接工厂之后,将连接工厂放置到context中,一遍后续的使用。

4:对于发送请求的处理

前面说过,对于请求的处理都是通过webclient来处理的。

public
static
WebClientgetWebClient(HttpServletRequest request) {

HttpSession session = request.getSession(true);

WebClient client = getWebClient(session);

if(client ==
null
|| client.isClosed()) {

client = WebClient.createWebClient(request);

session.setAttribute(WEB_CLIENT_ATTRIBUTE,client);

}

return client;

}

这里可以看到,webclient生成之后是放到session中的。

WebClient client = newWebClient();

Stringauth = request.getHeader("Authorization");

if(auth !=
null
) {

String[] tokens = auth.split(" ");

if(tokens.length == 2) {

String encoded = tokens[1].trim();

String credentials = new String(javax.xml.bind.DatatypeConverter.parseBase64Binary(encoded));

String[] creds = credentials.split(":");

if (creds.length ==2) {

client.setUsername(creds[0]);

client.setPassword(creds[1]);

}

}

}

这里我们可以看到,JMS是支持验证的。知道支持JAAS验证,至于其他的验证方式是否支持还不清楚。后续会慢慢研究。

5:消息通过什么发送的

protected
void
sendMessage(WebClient client,
String[] stocks) throwsJMSException {

Session session = client.getSession();

int idx= 0;

while (true) {

idx = (int)Math.round(stocks.length *Math.random());

if (idx< stocks.length) {

break;

}

}

String stock =stocks[idx];

Destination destination = session.createTopic("STOCKS." +stock);

String stockText =createStockText(stock);

log("Sending:" + stockText +
" on destination: " +destination);

Message message =session.createTextMessage(stockText);

client.send(destination, message);

}

这里有一个session的概念。这里的session和j2ee中的session和hibernate中的session有些像。是JMS规范中的一部分。

他主要有一下作用:

• 它是MessageProducer和MessageConsumer的工厂。

• 它是TemporaryTopic和TemporaryQueue的工厂。

• 它为需要动态操纵提供商专有目的地名字的客户端提供了一种创建Queue或Topic对象的途径。

• 它提供了提供优化后的消息工厂。

• 它支持事务串,这些事务将跨会话生产者和消费者的工作组合成原子单元。

• 它为它消费的消息和它生产的消息定义了一个连续的顺序。

• 它保留它消费的消息直到这些消息被确认。

• 它序列化注册到它的MessageListener的执行。

•它是QueueBrowser的工厂。

既然说到了session,就不能不说connection了。

Connection用作几个目的:

• 它封装了一个与JMS提供商的连接。他通常代表一个在提供商服务域和客户端间的打开的TCP/IP Socket。

• 在客户端授权时创建它。

• 它可以指定一个唯一的客户端标识。

• 它创建Session对象。

• 它提供ConnectionMetaData。

•它支持可选的ExceptionListener。

Connection作为重量级的对象,一般在客户端只建立一个对象。当然连接到不同的broker的时候,需要建立多个connection了。

消息发送就先简单介绍到这里,这里其实更多的是通过这个小demo,熟悉JMS规范。消息的发送和接收将重点在网络部分进行介绍。后续会分成几个部分开始博文:网络,存储,安全,集群等几个方面吧。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: