您的位置:首页 > 编程语言 > Java开发

Spring+ActiveMQ配置

2016-09-17 12:34 435 查看
依赖jar包:

                <dependency>

<groupId>org.apache.activemq</groupId>

<artifactId>activemq-spring</artifactId>

<version>5.13.2</version>

</dependency>

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-jms</artifactId>

<version>3.2.2.RELEASE</version>

</dependency>
一、发送端
1、XML配置

<bean id="producerService" class="com.vispractice.vplog.activemq.ProducerServiceImpl" /> 

<!-- ActiveMQ 连接工厂 -->
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
    <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
   <property name="brokerURL" value="${mq.brokerURL}"/>  
   <property name="trustAllPackages" value="true"/>
</bean>  

    <!-- Spring Caching连接工厂
    Spring用于管理真正的ConnectionFactory的ConnectionFactory 
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <property name="sessionCacheSize" value="100" />
    </bean>
    -->  
    
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
   <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
</bean>

    <!-- Spring JmsTemplate 的消息生产者 start-->

    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
   <property name="connectionFactory" ref="connectionFactory"/>  
</bean>  

<!--这个是队列目的地,点对点的-->  
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">  
   <constructor-arg>  
       <value>vp-log-queue</value>  
   </constructor-arg>  
</bean>  
2.类ProducerServiceImpl .java

@Component  

public class ProducerServiceImpl implements ProducerService {   

    

    private JmsTemplate jmsTemplate;   

       

    public void sendMessage(Destination destination, final ActiveMQObjMsgBean jmsObject) {   

        System.out.println("---------------生产者发送消息-----------------");   

        jmsTemplate.send(destination, new MessageCreator() {   

            public Message createMessage(Session session) throws JMSException {   

    return session.createObjectMessage(jmsObject);

            }   

        });   

    }    

  

    public JmsTemplate getJmsTemplate() {   

        return jmsTemplate;   

    }    

  

    @Resource  

    public void setJmsTemplate(JmsTemplate jmsTemplate) {   

        this.jmsTemplate = jmsTemplate;   

    }   

    
}

二、接收端
1、XML配置

<!-- ActiveMQ 连接工厂 -->

    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->

    <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->

    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  

   <property name="brokerURL" value="${mq.brokerURL}"/>  

   <property name="trustAllPackages" value="true"/>

</bean>  

    <!-- Spring Caching连接工厂    Spring用于管理真正的ConnectionFactory的ConnectionFactory 有缓存功能

    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">

        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>

        <property name="sessionCacheSize" value="100" />

    </bean>

    -->  

    

    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  

<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  

   <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  

</bean>

    <!-- Spring JmsTemplate 的消息生产者 start-->

    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  

   <property name="connectionFactory" ref="connectionFactory"/>  

</bean>  

<!--这个是队列目的地,点对点的-->  

<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">  

   <constructor-arg>  

       <value>vp-log-queue</value>  

   </constructor-arg>  

</bean>  

    <!-- 消息监听器 -->  

<bean id="receiverListener" class="com.vispractice.vplog.activemq.ReceiverListener"/> 

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  

   <property name="connectionFactory" ref="connectionFactory" />  

   <property name="destination" ref="queueDestination" />  

   <property name="messageListener" ref="receiverListener" />  
</bean> 

2、消息监听器ReceiverListener.java

public class ReceiverListener implements MessageListener {

@Resource(name = "logServiceImpl")

private LogService logService;

/**

* 现在只使用到ObjectMessage,其他的消息类型备用

*/

public void onMessage(Message msg) {

try {

if (msg instanceof TextMessage) {

TextMessage message = (TextMessage) msg;

System.out.println("------Received TextMessage------");

System.out.println(message.getText());

} else if (msg instanceof MapMessage) {

MapMessage message = (MapMessage) msg;

System.out.println("------Received MapMessage------");

System.out.println(message.getString("sysName"));

System.out.println(message.getString("sysFlag"));

System.out.println(message.getString("operation"));

System.out.println(message.getString("operator"));

System.out.println(message.getString("content"));

System.out.println(message.getString("parameter"));

System.out.println(message.getString("ip"));

System.out.println("------Received MapMessage for while------");

Enumeration enumer = message.getMapNames();

while (enumer.hasMoreElements()) {

Object obj = enumer.nextElement();

System.out.println(message.getObject(obj.toString()));

}

} else if (msg instanceof StreamMessage) {

StreamMessage message = (StreamMessage) msg;

System.out.println("------Received StreamMessage------");

System.out.println(message.readString());

System.out.println(message.readBoolean());

System.out.println(message.readLong());

} else if (msg instanceof ObjectMessage) {

System.out.println("------Received ObjectMessage------");

ObjectMessage message = (ObjectMessage) msg;

ActiveMQObjMsgBean jmsObject = (ActiveMQObjMsgBean) message.getObject();

String sysName = jmsObject.getSysName();

String sysFlag = jmsObject.getSysFlag();

String operation = jmsObject.getOperation();

String operator = jmsObject.getOperator();

String content = jmsObject.getContent();

String ip = jmsObject.getIp();

String[] ignoreParameters = jmsObject.getIgnoreParameters();

StringBuffer parameter = new StringBuffer();

Map<String, String[]> parameterMap = jmsObject.getParameterMap();

if (parameterMap != null) {

for (Entry<String, String[]> entry : parameterMap.entrySet()) {

String parameterName = entry.getKey();

if (!ArrayUtils.contains(ignoreParameters, parameterName)) {

String[] parameterValues = entry.getValue();

if (parameterValues != null) {

for (String parameterValue : parameterValues) {

parameter.append(parameterName + " = " + parameterValue + "\n");

}

}

}

}

}

Log log = new Log();

log.setSysName(sysName);

log.setSysFlag(sysFlag);

log.setOperation(operation);

log.setOperator(operator);

log.setContent(content);

log.setParameter(parameter.toString());

log.setIp(ip);

log.setCreateDate(new Date());

logService.save(log);

} else if (msg instanceof BytesMessage) {

System.out.println("------Received BytesMessage------");

BytesMessage message = (BytesMessage) msg;

byte[] byteContent = new byte[1024];

int length = -1;

StringBuffer content = new StringBuffer();

while ((length = message.readBytes(byteContent)) != -1) {

content.append(new String(byteContent, 0, length));

}

System.out.println(content.toString());

} else {

System.out.println(msg);

}

} catch (JMSException e) {

e.printStackTrace();

}

}

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: