您的位置:首页 > 编程语言 > Java开发

MQ (Spring\生产者消费者)多种常用方式总结

2015-03-25 11:39 375 查看
------------------总结一下Spring中ActiveMq的配置:

一、Destination可以有3中配法(下面的Topic就是指Destination,MQ代指Factory)

a、JNDI注入

<bean id="jmsTgsMessageTopic" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="java:comp/env/jms/tgsMessageTopic"></property>
</bean>

b、org.apache.activemq.command.ActiveMQTopic

<bean id="TgsMessageTopicDest" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="zteits/topic/TgsMessageTopic" />
</bean>

c、${TOPIC}

二、Factory可以有2中配法

a、JNDI

<bean id="jmsConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="java:comp/env/jms/ConnectionFactory"></property>
</bean>

b、org.apache.activemq.pool.PooledConnectionFactory

<bean id="jmsConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="java:comp/env/jms/ConnectionFactory"></property>
</bean>


三、jmsTemplate 可以发送或接受mq信息,(jmsTemplate需要factory和destination),注意一下destination注入的区别(a中是value,b中是ref,b可以传2种配置方式的topic)

a、(两个参数)

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref local="jmsFactory" />
</property>
<span style="color:#ff0000;"><property name="defaultDestinationName" value="${TOPIC}" />
</span>		<!-- 区别它采用的模式为false是p2p为true是订阅 -->
<property name="pubSubDomain" value="true" />
<!-- deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false-->
<property name="explicitQosEnabled" value="true" />
<!-- 发送模式  DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->
<property name="deliveryMode" value="1" />
</bean>

b、(两个参数)

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsConnectionFactory"></property>
<span style="color:#ff0000;"><property name="defaultDestination" ref="jmsBlackAlertMessageTopic"></property>
</span>	</bean>

c、(一个参数)

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://192.168.3.48:61616</value>
</property>

</bean>
<span style="color:#cc0000;"> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref bean="connectionFactory"/>
</property>
</bean>
</span>  <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0">
<value>HelloWorldQueue</value>
</constructor-arg>
</bean>
 
<span style="background-color: rgb(255, 255, 102);">发送</span>
<span style="color:#cc0000;background-color: rgb(255, 255, 102);">jmsTemplate.send (destination, new MessageCreator(){</span>
public Message createMessage(Session session) throws JMSException
{
return session.createTextMessage("大家好 这个是测试!");
}
});
 
<span style="background-color: rgb(255, 255, 102);">接受</span>
TextMessage msg = (TextMessage) <span style="color:#ff0000;background-color: rgb(255, 255, 102);">jmsTemplate.receive(destination);
</span>   System.out.println("reviced msg is:" + msg.getText());


四、如果接受mq信息最好使用org.springframework.jms.listener.DefaultMessageListenerContainer监听容器(需要传入Factory,Destination,Listener‘Consumers等参数)

<bean id="TgslistenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="concurrentConsumers" value="1" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="jmsTgsMessageTopic" />
<property name="messageListener" ref="tgsMsgReceiver" />
</bean>
<bean id="tgsMsgReceiver"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="com.zteits.atms.black.job.logic.TgsPassedCarReceiverImpl">
<!-- 当前bean的注入参数 -->
<property name="blacklistCompare" ref="BlackListComparerImpl" />
</bean>
</constructor-arg>
<property name="defaultListenerMethod" value="receiveData" />
</bean>

五、mq消息发送

JMSTemplate.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage(MSG);
logger.error(MSG);
return message;
}
});


---------------------------总结一下mq普通的写法(MessageProducer、MessageConsumer)

一、生产者

ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
<span style="color:#ff0000;">Destination destination;</span>
<span style="color:#ff0000;"><span style="background-color: rgb(255, 255, 102);">MessageProducer </span>producer;</span>
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, <span style="color:#ff0000;">"tcp://13.75.9.14:61616");</span>

try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
<span style="color:#ff0000;">destination = session.createTopic("zteits/topic/GuardMessageTopic");
producer = session.createProducer(destination);
</span>			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

String xmlStr = "mq信息 ";
TextMessage message = session.createTextMessage(xmlStr);
producer.send(message);
session.commit();

} catch (Exception e) {
e.printStackTrace();
}finally{
try {
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}

二、消费者(接受单次,不阻塞,不建议使用)

<span class="crayon-st">try</span><span class="crayon-sy">{</span>
<span style="color:#cc0000;">String url = "tcp://localhost:61616";</span>
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(<span style="color:#cc0000;">url</span>);
// 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中,也可以在activemq.xml中配置
connectionFactory.setUserName("system");
connectionFactory.setPassword("manager");
// 创建连接
Connection connection = connectionFactory.createConnection();
connection.start();
// 创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目标,就创建主题也可以创建队列
Destination destination = session.createQueue("test");
// 创建消息消费者
<span style="color:#ff0000;">MessageConsumer consumer </span>= session.createConsumer(destination);
// 接收消息,参数:接收消息的超时时间,为0的话则不超时,receive返回下一个消息,但是超时了或者消费者被关闭,返回null
<span style="color:#cc0000;">Message message = consumer.receive(1000);
</span>	           if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received: " + text);
} else {
System.out.println("Received: " + message);
}
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}

三、消费者(监听):可以持续接受mq信息多次,和方法四中的消费者有一点区别,不实现接口

package test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* @author Mr.hu
* @version crateTime:2013-9-10 下午4:14:48
* Class Explain: Queue(点对点)方式  消费者 Consumer
*/
public class QueueConsumer {

public static void main(String[] args) throws Exception{
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ChatConstants.MQ_USER,ChatConstants.MQ_PASSWORD,ChatConstants.MQ_URL);
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
Queue destination=session.createQueue("example.A");
// 消费者,消息接收者
MessageConsumer consumer = session.createConsumer(destination);
<span style="color:#ff0000;background-color: rgb(255, 255, 102);">consumer.setMessageListener(new MessageListener(){//</span>有事务限制
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage=(TextMessage)message;
System.out.println(textMessage.getText());
} catch (JMSException e1) {
e1.printStackTrace();
}
try {
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});

/*  另外一种接受方式
*    while (true) {
//设置接收者接收消息的时间,为了便于测试,这里谁定为100s
TextMessage message = (TextMessage) consumer.receive(100000);
if (null != message) {
System.out.println("收到消息" + message.getText());
} else {
break;
}
}*/

}

}

四、消费者(监听):public class JmsReceiver implements MessageListener {...}实现了MessageListener接口

// 初始化
private void initialize() throws JMSException, Exception {
// 连接工厂是用户创建连接的对象.
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL);
// 连接工厂创建一个jms connection
conn = connectionFactory.createConnection();
// 是生产和消费的一个单线程上下文。会话用于创建消息的生产者,消费者和消息。会话提供了一个事务性的上下文。
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 不支持事务
// 目的地是客户用来指定他生产消息的目标还有他消费消息的来源的对象.
dest = session.createQueue(SUBJECT);
// dest = session.createTopic(SUBJECT);
// 会话创建消息的生产者将消息发送到目的地
consumer = session.createConsumer(dest);
}

public void <span style="color:#cc0000;background-color: rgb(255, 255, 102);">receiveMessage</span>() throws JMSException, Exception {
initialize();
conn.start();
consumer.setMessageListener(this);
// 等待接收消息
while (!stop) {
Thread.sleep(5000);
}

}

public void onMessage(Message msg) {
......
}


项目配置总结-------------------------------------------------------------------------------------------------------------------------------------------------------------

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

1、sy_TgsWebService 中 mqContext.xml文件:${MQ}、${TOPIC}在config.properties 中配置好就行(注意jmsTemplate配置中的defaultDestinationName参数,与2中配置的有区别 1中 <property name="defaultDestinationName" value="${TOPIC}"
/>

2中<property name="defaultDestination" ref="jmsBlackAlertMessageTopic"></property>)

------------------TOPIC是占位符注入,Factory是在配置中配置占位符,jmsTemplate可以发送或者接受mq信息(此配置主要用于发送,不建议接受mq)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd" default-autowire="byName">

<!-- 配置connectionFactory -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>${MQ}</value>
</property>
</bean>
</property>
<property name="maxConnections" value="5"></property>
</bean>

<!-- Spring JMS Template -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref local="jmsFactory" />
</property>
<property name="defaultDestinationName" value="${TOPIC}" />
<!-- 区别它采用的模式为false是p2p为true是订阅 -->
<property name="pubSubDomain" value="true" />
<!-- deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false-->
<property name="explicitQosEnabled" value="true" />
<!-- 发送模式  DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->
<property name="deliveryMode" value="1" />
</bean>
</beans>


2、sy_duty_atms_black 中 msgContext.xml文件:MQ和TOPIC是在TOMCAT的配置文件中配置的

---------------------TOPIC通过JNDI注入,FACTORY也是通过JDNI注入,TgslistenerContainer是接受mq信息的容器,jmsTemplate可以发送mq信息

jmsTgsMessageTopic的bean可以替换为 --- <bean id="TgsMessageTopicDest" class="org.apache.activemq.command.ActiveMQTopic">

--- <constructor-arg value="zteits/topic/TgsMessageTopic" />

--- </bean>

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> <bean id="jmsConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="java:comp/env/jms/ConnectionFactory"></property> </bean>
<bean id="jmsTgsMessageTopic" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="java:comp/env/jms/tgsMessageTopic"></property>
</bean>

<bean id="jmsBlackAlertMessageTopic" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="java:comp/env/jms/blackAlertMessageTopic"></property>
</bean>

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsConnectionFactory"></property>
<property name="defaultDestination" ref="jmsBlackAlertMessageTopic"></property>
</bean>

<bean id="tgsMsgReceiver"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="com.zteits.atms.black.job.logic.TgsPassedCarReceiverImpl">
<!-- 当前bean的注入参数 -->
<property name="blacklistCompare" ref="BlackListComparerImpl" />
</bean>
</constructor-arg>
<property name="defaultListenerMethod" value="receiveData" />
</bean>
<!-- 卡口消息数据的监听容器 :每个消费者需要声明一个DefaultMessageListenerContainer -->
<bean id="TgslistenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="concurrentConsumers" value="1" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="jmsTgsMessageTopic" />
<property name="messageListener" ref="tgsMsgReceiver" />
</bean>
<bean id="BlackListComparerImpl" class="com.zteits.atms.black.job.logic.BlackListComparerImpl">
<property name="msgContext" ref="MessageContext" />
<property name="jmsSender" ref="jmsTemplate" />
<property name="bklAlertDAO" ref="BklAlertDAO" />
<property name="bklInfoDAOExtend" ref="BklInfoDAOExtend"/>
<property name="illWhitelistDAOExtend" ref="IllWhitelistDAOExtend"/>
<property name="illInfoWhiteDAO" ref="IllInfoWhiteDAO"/>
<property name="bklIpRelationDAOExtend" ref="BklIpRelationDAOExtend"/>
<property name="BklAlarmDAO" ref="BklAlarmDAO"/>
<property name="sysPointDAOExtend" ref="SysPointDAOExtend"/>
</bean>
<bean id="BklInfoDAOExtend" class="com.zteits.atms.black.job.dao.BklInfoDAOExtend" parent="BklInfoDAO" />
<bean id="IllWhitelistDAOExtend" class="com.zteits.atms.black.job.dao.IllWhitelistDAOExtend" parent="IllWhitelistDAO" />
<bean id="BklIpRelationDAOExtend" class="com.zteits.atms.black.job.dao.BklIpRelationDAOExtend" parent="BklIpRelationDAO" />
<bean id="SysPointDAOExtend" class="com.zteits.atms.black.job.dao.SysPointDAOExtend" parent="SysPointDAO" />
</beans>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: