您的位置:首页 > 运维架构 > Apache

Apache ActiveMQ 高级应用 - 自定义分发策略

2014-01-23 15:30 417 查看
在某些业务场景下,可能会用到按需分发消息。

对于AMQ他内置了很多的分发策略可供我们选择(DispatchPolicy的实现类),如:PriorityDispatchPolicy, PriorityNetworkDispatchPolicy, RoundRobinDispatchPolicy, SimpleDispatchPolicy, StrictOrderDispatchPolicy。

那我们也可以自己去实现DispatchPolicy接口,做一个适合特定业务场景的分发策略。

首先我们要去http://svn.apache.org/repos/asf/activemq这里下载对应版本的源码;

本例中为方便起见,我们拷贝了一段SimpleDispatchPolicy类的代码(路径:org.apache.activemq.broker.region.policy),当做我们的自定义类的内容,如:

public class TestDispatchPolicy implements DispatchPolicy {

public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception {

ActiveMQDestination _destination = node.getMessage().getDestination();

// 取得Topic name 和前缀,如:topic://Topic.foo
System.out.println("-------->_destination.getQualifiedName:"+ _destination.getQualifiedName());
// 取得Topic name,如:Topic.foo
System.out.println("-------->_destination.getPhysicalName:"+ _destination.getPhysicalName());
synchronized (consumers) {
int count = 0;
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
Subscription sub = (Subscription)iter.next();
// 取得消费者的clientId,如:connection.setClientID("YourClientID");
System.out.println("-------->sub.getContext().getClientId:"+ sub.getContext().getClientId());
// Only dispatch to interested subscriptions
if (!sub.matches(node, msgContext)) {
sub.unmatched(node);
continue;
}

sub.add(node);
count++;
}
return count > 0;
}
}

}


ok,接着我们做如下几个步骤:

1、将这个类放到activemq project/activemq-broker的相应路径下,在activemq-project/目录下执行mvn package;

2、待maven执行完成后,将activemq-broker-version.jar和activemq-spring-version.jar放入到apache-activemq-version-bin/lib/目录下,替换对应的文件;

3、修改activemq.xml,加入元素dispatchPolicy,完成。

Apache ActiveMQ单点基本配置 activemq.xml为基础,修改的内容如下:

<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="false" memoryLimit="10mb">

<dispatchPolicy>
<! -- 新增的分发策略 -->
<testDispatchPolicy/>
</dispatchPolicy>
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>

</policyEntries>
</policyMap>
</destinationPolicy>


至此我们就实现了Activemq的自定义分发策略功能,启动activemq,查看一下控制台。

参考资源:

http://activemq.apache.org/dispatch-policies.html

http://activemq.apache.org/maven/5.9.0/apidocs/index.html?deprecated-list.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐