您的位置:首页 > 编程语言 > Java开发

ActiveQM与spring集成配置

2016-07-15 15:36 330 查看
首先Spring提供了多种connectionFactory,SingleConnectionFactory是其中的一种实现SingleConnectionFactory对于建立JMS服务器链接的请求会一直返回同一个链接,并且会忽略Connection的close方法调用。而CachingConnectionFactory继承了SingleConnectionFactory,它拥有SingleConnectionFactory的所有功能,同时它还新增了缓存功能,它可以缓存Session、MessageProducer和MessageConsumer。

 

1.1  首先需要在配置文件中注入CachingConnectionFactory

<beanid="connectionFactory"class="org.springframework.jms.connection.CachingConnectionFactory">
   </bean>

 

1.2、Spring提供的ConnectionFactory只是用于管理。真正需要连接到jms服务上的还需要jms的开发方在提供。如何配置ActiveMQ提供的ConnectionFactory?

<!-- ActiveMQ 连接工厂 -->
    <amq:connectionFactoryid="amqConnectionFactory"
       brokerURL="tcp://127.0.0.1:61616"userName="admin"password="admin" 
/>
 
    <!-- SpringCaching连接工厂 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory
--> 
    <beanid="connectionFactory"class="org.springframework.jms.connection.CachingConnectionFactory">
        <!--
目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory
--> 
        <propertyname="targetConnectionFactory"ref="amqConnectionFactory"></property>
        <!-- Session缓存数量 -->
        <propertyname="sessionCacheSize"value="100"
/>
    </bean>

 

1.3、配置生产者

生产者负责产生消息并发送到JMS服务器。在Spring中提供JmsTemplate类来实现。用JmsTemplate进行消息发送的时候需要告诉程序发送的那种类型,点对点还是,发布订阅?配置如下:

<!-- 定义JmsTemplate的Queue类型
-->
    <beanid="jmsQueueTemplate"class="org.springframework.jms.core.JmsTemplate">
   
        <constructor-argref="connectionFactory"
/>
        <!--队列模式 -->
        <propertyname="pubSubDomain"value="false"
/>
    </bean>
 
    <!--
定义JmsTemplate的Topic类型 -->
    <beanid="jmsTopicTemplate"class="org.springframework.jms.core.JmsTemplate">
         <!--
这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象
--> 
        <constructor-argref="connectionFactory"
/>
        <!-- pub/sub模型(发布/订阅) -->
        <propertyname="pubSubDomain"value="true"
/>
</bean>

 

1.4、配置消费者

消费者是对指定目的地的消息进行消费,每个消费者对应每个目的地都需要有对应的MessageListenerContainer。对于消息监听容器而言,除了要知道监听哪个目的地之外,还需要知道到哪里去监听,也就是说它还需要知道去监听哪个JMS服务器,通过配置MessageListenerContainer的时候往里面注入一个ConnectionFactory来实现的。所以我们在配置一个MessageListenerContainer的时候有三个属性必须指定:一个是表示从哪里监听的ConnectionFactory;一个是表示监听什么的Destination;一个是接收到消息以后进行消息处理的MessageListener。

 

<!-- 定义Queue监听器点对点队列 -->
    <jms:listener-containerdestination-type="queue"container-type="default"connection-factory="connectionFactory"acknowledge="auto">
        <jms:listenerdestination="test.queue"ref="queueReceiver1"/>
        <jms:listenerdestination="test.queue"ref="queueReceiver2"/>
</jms:listener-container>

 

<!-- 定义Topic监听器订阅-->
    <jms:listener-containerdestination-type="topic"container-type="default"connection-factory="connectionFactory"acknowledge="auto">
        <jms:listenerdestination="test.topic"ref="topicReceiver1"/>
        <jms:listenerdestination="test.topic"ref="topicReceiver2"/>
    </jms:listener-container>

 

代码实现一对一的生产和消费

生产者:

    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate
jmsTemplate;   
    /**
     * 发送一条消息到指定的队列(目标)
     * @paramqueueName队列名称
     * @param
message消息内容
     */
    public
void
send(String queueName,final String
message){
            jmsTemplate.send(queueName,
new MessageCreator() {
            @Override
            public MessagecreateMessage(Session
session) throwsJMSException {
                return
session.createTextMessage(message);
            }
        });
    }

接受者:

@Component
publicclasQueueReceiver1
implements MessageListener {

public
void
recMessage(Message message){
        try {
            System.out.println("Receiver1接收到消息:"+((TextMessage)message).getText());
        }catch(JMSException
e) {
            e.printStackTrace();
        }
    }

}

 

订阅与发布

发布者:

@Component("topicSender")
public
class
TopicSender {
   
    @Autowired
    @Qualifier("jmsTopicTemplate")
    private JmsTemplate
jmsTemplate;
   
    /**
     * 发送一条消息到指定的队列(目标)
     * @param queueName
队列名称
     * @param message
消息内容
     */
    public
void
send(String topicName,final String
message){
        jmsTemplate.send(topicName,
new MessageCreator() {
            @Override
            public MessagecreateMessage(Session
session) throwsJMSException {
                return
session.createTextMessage(message);
            }
        });
    }
 
}

订阅者:

@Component
public
class
TopicReceiver1 implements MessageListener{
 
 
    @Override
    public
void
onMessage(Message message) {
        try {
            System.out.println("TopicReceiver1接收到消息:"+((TextMessage)message).getText());
        }catch(JMSException
e) {
            e.printStackTrace();
        }
    }
   
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: