MQ (Spring\生产者消费者)多种常用方式总结
2015-03-25 11:39
375 查看
------------------总结一下Spring中ActiveMq的配置:
一、Destination可以有3中配法(下面的Topic就是指Destination,MQ代指Factory)
a、JNDI注入
b、org.apache.activemq.command.ActiveMQTopic
c、${TOPIC}
二、Factory可以有2中配法
a、JNDI
b、org.apache.activemq.pool.PooledConnectionFactory
三、jmsTemplate 可以发送或接受mq信息,(jmsTemplate需要factory和destination),注意一下destination注入的区别(a中是value,b中是ref,b可以传2种配置方式的topic)
a、(两个参数)
b、(两个参数)
c、(一个参数)
四、如果接受mq信息最好使用org.springframework.jms.listener.DefaultMessageListenerContainer监听容器(需要传入Factory,Destination,Listener‘Consumers等参数)
五、mq消息发送
---------------------------总结一下mq普通的写法(MessageProducer、MessageConsumer)
一、生产者
二、消费者(接受单次,不阻塞,不建议使用)
三、消费者(监听):可以持续接受mq信息多次,和方法四中的消费者有一点区别,不实现接口
四、消费者(监听):public class JmsReceiver implements MessageListener {...}实现了MessageListener接口
项目配置总结-------------------------------------------------------------------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1、sy_TgsWebService 中 mqContext.xml文件:${MQ}、${TOPIC}在config.properties 中配置好就行(注意jmsTemplate配置中的defaultDestinationName参数,与2中配置的有区别 1中 <property name="defaultDestinationName" value="${TOPIC}"
/>
2中<property name="defaultDestination" ref="jmsBlackAlertMessageTopic"></property>)
------------------TOPIC是占位符注入,Factory是在配置中配置占位符,jmsTemplate可以发送或者接受mq信息(此配置主要用于发送,不建议接受mq)
2、sy_duty_atms_black 中 msgContext.xml文件:MQ和TOPIC是在TOMCAT的配置文件中配置的
---------------------TOPIC通过JNDI注入,FACTORY也是通过JDNI注入,TgslistenerContainer是接受mq信息的容器,jmsTemplate可以发送mq信息
jmsTgsMessageTopic的bean可以替换为 --- <bean id="TgsMessageTopicDest" class="org.apache.activemq.command.ActiveMQTopic">
--- <constructor-arg value="zteits/topic/TgsMessageTopic" />
--- </bean>
一、Destination可以有3中配法(下面的Topic就是指Destination,MQ代指Factory)
a、JNDI注入
<bean id="jmsTgsMessageTopic" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="java:comp/env/jms/tgsMessageTopic"></property> </bean>
b、org.apache.activemq.command.ActiveMQTopic
<bean id="TgsMessageTopicDest" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="zteits/topic/TgsMessageTopic" /> </bean>
c、${TOPIC}
二、Factory可以有2中配法
a、JNDI
<bean id="jmsConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="java:comp/env/jms/ConnectionFactory"></property> </bean>
b、org.apache.activemq.pool.PooledConnectionFactory
<bean id="jmsConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="java:comp/env/jms/ConnectionFactory"></property> </bean>
三、jmsTemplate 可以发送或接受mq信息,(jmsTemplate需要factory和destination),注意一下destination注入的区别(a中是value,b中是ref,b可以传2种配置方式的topic)
a、(两个参数)
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <ref local="jmsFactory" /> </property> <span style="color:#ff0000;"><property name="defaultDestinationName" value="${TOPIC}" /> </span> <!-- 区别它采用的模式为false是p2p为true是订阅 --> <property name="pubSubDomain" value="true" /> <!-- deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false--> <property name="explicitQosEnabled" value="true" /> <!-- 发送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久--> <property name="deliveryMode" value="1" /> </bean>
b、(两个参数)
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsConnectionFactory"></property> <span style="color:#ff0000;"><property name="defaultDestination" ref="jmsBlackAlertMessageTopic"></property> </span> </bean>
c、(一个参数)
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://192.168.3.48:61616</value> </property> </bean> <span style="color:#cc0000;"> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <ref bean="connectionFactory"/> </property> </bean> </span> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0"> <value>HelloWorldQueue</value> </constructor-arg> </bean>
<span style="background-color: rgb(255, 255, 102);">发送</span>
<span style="color:#cc0000;background-color: rgb(255, 255, 102);">jmsTemplate.send (destination, new MessageCreator(){</span> public Message createMessage(Session session) throws JMSException { return session.createTextMessage("大家好 这个是测试!"); } });
<span style="background-color: rgb(255, 255, 102);">接受</span>
TextMessage msg = (TextMessage) <span style="color:#ff0000;background-color: rgb(255, 255, 102);">jmsTemplate.receive(destination); </span> System.out.println("reviced msg is:" + msg.getText());
四、如果接受mq信息最好使用org.springframework.jms.listener.DefaultMessageListenerContainer监听容器(需要传入Factory,Destination,Listener‘Consumers等参数)
<bean id="TgslistenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="concurrentConsumers" value="1" /> <property name="connectionFactory" ref="jmsConnectionFactory" /> <property name="destination" ref="jmsTgsMessageTopic" /> <property name="messageListener" ref="tgsMsgReceiver" /> </bean>
<bean id="tgsMsgReceiver" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg> <bean class="com.zteits.atms.black.job.logic.TgsPassedCarReceiverImpl"> <!-- 当前bean的注入参数 --> <property name="blacklistCompare" ref="BlackListComparerImpl" /> </bean> </constructor-arg> <property name="defaultListenerMethod" value="receiveData" /> </bean>
五、mq消息发送
JMSTemplate.send(new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage message = session.createTextMessage(MSG); logger.error(MSG); return message; } });
---------------------------总结一下mq普通的写法(MessageProducer、MessageConsumer)
一、生产者
ConnectionFactory connectionFactory; Connection connection = null; Session session = null; <span style="color:#ff0000;">Destination destination;</span> <span style="color:#ff0000;"><span style="background-color: rgb(255, 255, 102);">MessageProducer </span>producer;</span> connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, <span style="color:#ff0000;">"tcp://13.75.9.14:61616");</span> try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); <span style="color:#ff0000;">destination = session.createTopic("zteits/topic/GuardMessageTopic"); producer = session.createProducer(destination); </span> producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); String xmlStr = "mq信息 "; TextMessage message = session.createTextMessage(xmlStr); producer.send(message); session.commit(); } catch (Exception e) { e.printStackTrace(); }finally{ try { session.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } }
二、消费者(接受单次,不阻塞,不建议使用)
<span class="crayon-st">try</span><span class="crayon-sy">{</span>
<span style="color:#cc0000;">String url = "tcp://localhost:61616";</span> ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(<span style="color:#cc0000;">url</span>); // 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中,也可以在activemq.xml中配置 connectionFactory.setUserName("system"); connectionFactory.setPassword("manager"); // 创建连接 Connection connection = connectionFactory.createConnection(); connection.start(); // 创建Session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建目标,就创建主题也可以创建队列 Destination destination = session.createQueue("test"); // 创建消息消费者 <span style="color:#ff0000;">MessageConsumer consumer </span>= session.createConsumer(destination); // 接收消息,参数:接收消息的超时时间,为0的话则不超时,receive返回下一个消息,但是超时了或者消费者被关闭,返回null <span style="color:#cc0000;">Message message = consumer.receive(1000); </span> if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println("Received: " + text); } else { System.out.println("Received: " + message); } consumer.close(); session.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); }
三、消费者(监听):可以持续接受mq信息多次,和方法四中的消费者有一点区别,不实现接口
package test; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author Mr.hu * @version crateTime:2013-9-10 下午4:14:48 * Class Explain: Queue(点对点)方式 消费者 Consumer */ public class QueueConsumer { public static void main(String[] args) throws Exception{ // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ChatConstants.MQ_USER,ChatConstants.MQ_PASSWORD,ChatConstants.MQ_URL); // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一个发送或接收消息的线程 final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息发送给谁. Queue destination=session.createQueue("example.A"); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); <span style="color:#ff0000;background-color: rgb(255, 255, 102);">consumer.setMessageListener(new MessageListener(){//</span>有事务限制 @Override public void onMessage(Message message) { try { TextMessage textMessage=(TextMessage)message; System.out.println(textMessage.getText()); } catch (JMSException e1) { e1.printStackTrace(); } try { session.commit(); } catch (JMSException e) { e.printStackTrace(); } } }); /* 另外一种接受方式 * while (true) { //设置接收者接收消息的时间,为了便于测试,这里谁定为100s TextMessage message = (TextMessage) consumer.receive(100000); if (null != message) { System.out.println("收到消息" + message.getText()); } else { break; } }*/ } }
四、消费者(监听):public class JmsReceiver implements MessageListener {...}实现了MessageListener接口
// 初始化 private void initialize() throws JMSException, Exception { // 连接工厂是用户创建连接的对象. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL); // 连接工厂创建一个jms connection conn = connectionFactory.createConnection(); // 是生产和消费的一个单线程上下文。会话用于创建消息的生产者,消费者和消息。会话提供了一个事务性的上下文。 session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 不支持事务 // 目的地是客户用来指定他生产消息的目标还有他消费消息的来源的对象. dest = session.createQueue(SUBJECT); // dest = session.createTopic(SUBJECT); // 会话创建消息的生产者将消息发送到目的地 consumer = session.createConsumer(dest); } public void <span style="color:#cc0000;background-color: rgb(255, 255, 102);">receiveMessage</span>() throws JMSException, Exception { initialize(); conn.start(); consumer.setMessageListener(this); // 等待接收消息 while (!stop) { Thread.sleep(5000); } } public void onMessage(Message msg) {
......
}
项目配置总结-------------------------------------------------------------------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1、sy_TgsWebService 中 mqContext.xml文件:${MQ}、${TOPIC}在config.properties 中配置好就行(注意jmsTemplate配置中的defaultDestinationName参数,与2中配置的有区别 1中 <property name="defaultDestinationName" value="${TOPIC}"
/>
2中<property name="defaultDestination" ref="jmsBlackAlertMessageTopic"></property>)
------------------TOPIC是占位符注入,Factory是在配置中配置占位符,jmsTemplate可以发送或者接受mq信息(此配置主要用于发送,不建议接受mq)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd" default-autowire="byName"> <!-- 配置connectionFactory --> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>${MQ}</value> </property> </bean> </property> <property name="maxConnections" value="5"></property> </bean> <!-- Spring JMS Template --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <ref local="jmsFactory" /> </property> <property name="defaultDestinationName" value="${TOPIC}" /> <!-- 区别它采用的模式为false是p2p为true是订阅 --> <property name="pubSubDomain" value="true" /> <!-- deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false--> <property name="explicitQosEnabled" value="true" /> <!-- 发送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久--> <property name="deliveryMode" value="1" /> </bean> </beans>
2、sy_duty_atms_black 中 msgContext.xml文件:MQ和TOPIC是在TOMCAT的配置文件中配置的
---------------------TOPIC通过JNDI注入,FACTORY也是通过JDNI注入,TgslistenerContainer是接受mq信息的容器,jmsTemplate可以发送mq信息
jmsTgsMessageTopic的bean可以替换为 --- <bean id="TgsMessageTopicDest" class="org.apache.activemq.command.ActiveMQTopic">
--- <constructor-arg value="zteits/topic/TgsMessageTopic" />
--- </bean>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> <bean id="jmsConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="java:comp/env/jms/ConnectionFactory"></property> </bean>
<bean id="jmsTgsMessageTopic" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="java:comp/env/jms/tgsMessageTopic"></property>
</bean>
<bean id="jmsBlackAlertMessageTopic" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="java:comp/env/jms/blackAlertMessageTopic"></property>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsConnectionFactory"></property>
<property name="defaultDestination" ref="jmsBlackAlertMessageTopic"></property>
</bean>
<bean id="tgsMsgReceiver"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="com.zteits.atms.black.job.logic.TgsPassedCarReceiverImpl">
<!-- 当前bean的注入参数 -->
<property name="blacklistCompare" ref="BlackListComparerImpl" />
</bean>
</constructor-arg>
<property name="defaultListenerMethod" value="receiveData" />
</bean>
<!-- 卡口消息数据的监听容器 :每个消费者需要声明一个DefaultMessageListenerContainer -->
<bean id="TgslistenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="concurrentConsumers" value="1" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="jmsTgsMessageTopic" />
<property name="messageListener" ref="tgsMsgReceiver" />
</bean>
<bean id="BlackListComparerImpl" class="com.zteits.atms.black.job.logic.BlackListComparerImpl">
<property name="msgContext" ref="MessageContext" />
<property name="jmsSender" ref="jmsTemplate" />
<property name="bklAlertDAO" ref="BklAlertDAO" />
<property name="bklInfoDAOExtend" ref="BklInfoDAOExtend"/>
<property name="illWhitelistDAOExtend" ref="IllWhitelistDAOExtend"/>
<property name="illInfoWhiteDAO" ref="IllInfoWhiteDAO"/>
<property name="bklIpRelationDAOExtend" ref="BklIpRelationDAOExtend"/>
<property name="BklAlarmDAO" ref="BklAlarmDAO"/>
<property name="sysPointDAOExtend" ref="SysPointDAOExtend"/>
</bean>
<bean id="BklInfoDAOExtend" class="com.zteits.atms.black.job.dao.BklInfoDAOExtend" parent="BklInfoDAO" />
<bean id="IllWhitelistDAOExtend" class="com.zteits.atms.black.job.dao.IllWhitelistDAOExtend" parent="IllWhitelistDAO" />
<bean id="BklIpRelationDAOExtend" class="com.zteits.atms.black.job.dao.BklIpRelationDAOExtend" parent="BklIpRelationDAO" />
<bean id="SysPointDAOExtend" class="com.zteits.atms.black.job.dao.SysPointDAOExtend" parent="SysPointDAO" />
</beans>
相关文章推荐
- Spring学习总结(一)——Spring实现IoC的多种方式
- 生产者/消费者问题的多种Java实现方式
- Spring学习总结——Spring实现IoC的多种方式
- 生产者/消费者问题的多种Java实现方式
- Spring学习总结——Spring实现AOP的多种方式
- 分析总结Spring管理Hibernate中Dao层访问数据库常用方式(附SSH的jar包)
- Java并发编程深入学习——生产者-消费者模式多种实现方式
- Spring学习总结(一)——Spring实现IoC的多种方式
- 生产者/消费者问题的多种Java实现方式
- 生产者/消费者问题的多种Java实现方式
- 生产者/消费者问题的多种Java实现方式
- 生产者/消费者问题的多种Java实现方式
- 生产者/消费者问题的多种Java实现方式
- Spring学习总结(三)——Spring实现AOP的多种方式
- 生产者/消费者问题的多种Java实现方式
- 生产者/消费者问题的多种Java实现方式
- Spring学习总结——Spring实现AOP的多种方式
- spring+activemq配置多个生产者,多个消费者并发处理消息
- Spring学习总结——Spring实现AOP的多种方式
- Spring学习总结——Spring实现AOP的多种方式