您的位置:首页 > 其它

ofbiz jms activemq

2015-01-15 12:26 190 查看
最近在研究ofbiz jms,用了activemq玩了一下,ofbiz使用JNDI与其它JMS进行消息接收与发送,总结一下。

准备

ofbiz12.04 版本

activemq 5.5 版本

jar包

在base/lib下引入包

activemq-all-5.5.0.jar

geronimo-j2ee-management_1.1_spec-1.0.1.jar

服务引擎配置

<span style="font-size:14px;"><jms-service name="serviceMessenger" send-mode="all">
<server jndi-server-name="default"
jndi-name="topicConnectionFactory"
topic-queue="OFBTopic"
type="topic"
listen="true"/>
</jms-service></span>


jndi.properties配置

<span style="font-size:14px;">java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://127.0.0.1:61616
topic.OFBTopic=OFBTopic
connectionFactoryNames=connectionFactory, queueConnectionFactory, topicConnectionFactory</span>


只试一个topic消息模式,如果想用queue队列,配置

<span style="font-size:14px;">queue.OFBQueue= OFBQueue</span>


测试

先启动activemq ,再启动ofbiz

ofbiz启动之后看到:

<span style="font-size:14px;">2015-01-15 11:47:56,970 (org.ofbiz.service.jms.JmsListenerFactory@1b1ce9a) [ JmsListenerFactory.java:89 :INFO ] JMS Listener Factory Thread Finished; All listeners connected.</span>
如果报监听失败,自己查查什么问题。

之后再看activemq控制台



消息发送和接收

咱们是用ofbiz给出的测试例子:

services_test.xml

<span style="font-size:14px;"><service name="testJMSTopic" engine="jms" location="serviceMessenger" invoke="testScv">
<description>Test JMS Topic service</description>
<attribute name="message" type="String" mode="IN"/>
</service></span>


运行此服务,看下效果:

<span style="font-size:14px;">2015-01-15 12:09:10,754 (ActiveMQ Session Task-1) [       ModelService.java:469:INFO ] Set default value [999.9999] for parameter [defaultValue]
2015-01-15 12:09:10,755 (ActiveMQ Session Task-1) [     ServiceEcaRule.java:137:INFO ] For Service ECA [testScv] on [invoke] got false for condition: [message][equals][auto][true][String]
---- SVC-CONTEXT: message => 测试TOPIC消息
---- SVC-CONTEXT: locale => zh_CN
---- SVC-CONTEXT: timeZone => sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]
---- SVC-CONTEXT: userLogin => [GenericEntity:UserLogin][createdStamp,2014-07-17 20:36:06.0(java.sql.Timestamp)][createdTxStamp,2014-07-17 20:36:06.0(java.sql.Timestamp)][currentPassword,{SHA}47ca69ebb4bdc9ae0adec130880165d2cc05db1a(java.lang.String)][enabled,Y(java.lang.String)][hasLoggedOut,N(java.lang.String)][lastTimeZone,Asia/Macao(java.lang.String)][lastUpdatedStamp,2014-11-26 16:12:57.0(java.sql.Timestamp)][lastUpdatedTxStamp,2014-11-26 16:12:57.0(java.sql.Timestamp)][partyId,admin(java.lang.String)][successiveFailedLogins,0(java.lang.Long)][userLoginId,admin(java.lang.String)]
---- SVC-CONTEXT: defaultValue => 999.9999
-----SERVICE TEST----- : 测试TOPIC消息
----- SVC: entity-default -----</span>
看下activemq控制台



入列消息一条,接收消息一条。

说明一下为什么会这样

我们运行服务发送了一条广播消息,这就是为什么Messages Enqueued有一条数据,那么为什么Messages Dequeued也有一条消息,因为ofbiz是服务端同时也是客户端,我们设置了监听不是。。

ofbiz作为服务端

我们只要看一下JMS的服务引擎就明白了,不多解释

ofbiz作为客户端

这块是怎么玩呢?

首先我们在服务引擎配置文件设置了监听为true,

其次,我们看下在加载服务引擎之后,在根据服务引擎配置文件加载JmsListenerFactory.java去加载JMS的监听。

最后,贴出来一段代码

JmsTopicListener.java

<span style="font-size:14px;"> public synchronized void load() throws GenericServiceException {
try {
InitialContext jndi = JNDIContextFactory.getInitialContext(jndiServer);
TopicConnectionFactory factory = (TopicConnectionFactory) jndi.lookup(jndiName);

if (factory != null) {
con = factory.createTopicConnection(userName, password);
con.setExceptionListener(this);
session = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topic = (Topic) jndi.lookup(topicName);
if (topic != null) {
TopicSubscriber subscriber = session.createSubscriber(topic);
subscriber.setMessageListener(this);
con.start();
this.setConnected(true);
if (Debug.infoOn()) Debug.logInfo("Listening to topic [" + topicName + "] on [" + jndiServer + "]...", module);
} else {
throw new GenericServiceException("Topic lookup failed.");
}
} else {
throw new GenericServiceException("Factory (broker) lookup failed.");
}
} catch (NamingException ne) {
throw new GenericServiceException("JNDI lookup problems; listener not running.", ne);
} catch (JMSException je) {
throw new GenericServiceException("JMS internal error; listener not running.", je);
} catch (GeneralException ge) {
throw new GenericServiceException("Problems with InitialContext; listener not running.", ge);
}
}</span>


当时,还有一个疑问,那有监听之后ofbiz做了什么事情去处理监听到消息事件之后的事情?

同样,贴出一段代码,大家就都明白了:

AbstractJmsListener.java

<span style="font-size:14px;">/**
* Receives the MapMessage and processes the service.
* @see javax.jms.MessageListener#onMessage(Message)
*/
public void onMessage(Message message) {
MapMessage mapMessage = null;

if (Debug.verboseOn()) Debug.logVerbose("JMS Message Received --> " + message, module);

if (message instanceof MapMessage) {
mapMessage = (MapMessage) message;
} else {
Debug.logError("Received message is not a MapMessage!", module);
return;
}
runService(mapMessage);
}</span>


<span style="font-size:14px;">/**
* Runs the service defined in the MapMessage
* @param message
* @return Map
*/
protected Map<String, Object> runService(MapMessage message) {
Map<String, ? extends Object> context = null;
String serviceName = null;
String xmlContext = null;

try {
serviceName = message.getString("serviceName");
xmlContext = message.getString("serviceContext");
if (serviceName == null || xmlContext == null) {
Debug.logError("Message received is not an OFB service message. Ignored!", module);
return null;
}

Object o = XmlSerializer.deserialize(xmlContext, dispatcher.getDelegator());

if (Debug.verboseOn()) Debug.logVerbose("De-Serialized Context --> " + o, module);
if (ObjectType.instanceOf(o, "java.util.Map"))
context = UtilGenerics.checkMap(o);
} catch (JMSException je) {
Debug.logError(je, "Problems reading message.", module);
} catch (Exception e) {
Debug.logError(e, "Problems deserializing the service context.", module);
}

try {
ModelService model = dispatcher.getDispatchContext().getModelService(serviceName);
if (!model.export) {
Debug.logWarning("Attempt to invoke a non-exported service: " + serviceName, module);
return null;
}
} catch (GenericServiceException e) {
Debug.logError(e, "Unable to get ModelService for service : " + serviceName, module);
}

if (Debug.verboseOn()) Debug.logVerbose("Running service: " + serviceName, module);

Map<String, Object> result = null;
if (context != null) {
try {
result = dispatcher.runSync(serviceName, context);
} catch (GenericServiceException gse) {
Debug.logError(gse, "Problems with service invocation.", module);
}
}
return result;
}</span>


注意

可以同时配置队列和广播两种模式消息,配置同上并参考ACTIVE MQ官网对JNDI支持

参考

ofbiz jms:https://cwiki.apache.org/confluence/display/OFBIZ/Distributed+Entity+Cache+Clear+(DCC)+Mechanism

activemq jndi:http://activemq.apache.org/jndi-support.html


                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: