您的位置:首页 > 其它

JMS消息类型

2016-08-19 15:25 435 查看
JMS总共提供了6个消息接口,分别为Message,TextMessage,StreamMessage,MapMessage,ObjectMessage以及ByteMessage,没中类型的消息都由3部分组成:消息头,消息属性和消息体。下面,我们就来探讨一下这几种消息的使用,首先看下消息的创建,代码如下:

@Service("messageProducerService")
public class MessageProducerService implements ProducerService {

/**
* 注入JmsTemplate
*/
@Resource(name="jmsTemplate")
private JmsTemplate jTemplate;

@Override
public void sendMessage(Destination destination, String message) {
System.out.println("acknowledgeMode:"+jTemplate.getSessionAcknowledgeMode());

/*
* 这种类型的消息仅仅包含JMS消息头和消息属性,而且,它仅限用于事件通知
* ,一个事件通知就是出现某些情况的一个广播,告警,或者通知,如果业务场景需要一个不含任何消息内容
*  的简单通知,轻量级消息类型就是实现它的最有效方式,例如,为了广播通知某个特定类中的一个异常,可以
*  发布一条不含有任何消息内容的异常文本消息
*/
jTemplate.send(destination, new MessageCreator() {

@Override
public Message createMessage(Session session) throws JMSException {
Message m = session.createMessage();
m.setStringProperty("exception", "java.lang.NosuchMethodException");
return m;
}
});

/*
* 这种类型的消息携带了一个字符串作为消息内容,可以用作简单的文本类型消息以及复杂的字符数据的交换例如:
* xml,json格式,发送这种类型的消息一般有两种方式,具体发送方式如下:
*/
jTemplate.send(destination, new MessageCreator() {

/**
*
* attention:直接使用session创建一个带具体消息内容的消息
* Details:example1
* @author chhliu
*/
//			@Override
//			public Message createMessage(Session session) throws JMSException {
//				return session.createTextMessage("my name is chhliu!");
//			}

/**
*
* attention:先创建一个不带消息内容的TextMessage,然后将需要发送的消息内容设到TextMessage中
* Details:example2
* @author chhliu
*/
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage tm = session.createTextMessage();
tm.setText("my name is chhliu!");
return tm;
}
});

/*
* 这种类型携带了一组key/value对作为消息内容,和Map的使用方法类似,值除了是JAVA原始数据类型和String类型
* 之外,还支持Object类型,本质上,MapMessage的工作方式类似于JMS的属性
*/
jTemplate.send(destination, new MessageCreator() {

@Override
public Message createMessage(Session session) throws JMSException {
MapMessage mm = session.createMapMessage();
mm.setInt("age", 27);
mm.setString("name", "chhliu");
mm.setString("password", "123456");
return mm;
}
});

/*
* 这种类型携带了一组原始类型字节流作为消息内容,它可以使用应用程序的本机格式来交换数据,这种格式可能不兼容其他现有
* 的Message类型,当JMS纯粹用于两个系统之间的消息传送时,也可以使用这种类型
*/
jTemplate.send(destination, new MessageCreator() {

@Override
public Message createMessage(Session session) throws JMSException {
BytesMessage bm = session.createBytesMessage();
bm.writeUTF("hello world!");
return bm;
}
});

/*
* 这种类型携带了一个可序列化JAVA对象作为消息内容,它可用于JAVA对象的交换
* 注意:该JAVA对象必须是序列化后的对象
* 对象消息可以使用session接口中定义的两个工厂方法中的任意一个来创建,其中一个工厂方法
* 没有使用参数,因此必须使用setObject()添加可序列化的对象,另一个工厂方法则使用Serializable对象
* 作为消息内容
*/
jTemplate.send(destination, new MessageCreator() {

@Override
public Message createMessage(Session session) throws JMSException {
ObjectMessage om = session.createObjectMessage();
User u = new User();
u.setAge(27);
u.setName("chhliu");
u.setPassword("123456");
om.setObject(u);
return om;
}
});
}
}
再来看下消息消费者模型,代码如下:
public class MessageConsumerReceive implements MessageListener {

@Override
public void onMessage(Message message) {
try {
/*
*  接收无任何内容的消息
*  消费者在接收这类型对象的时候,由于没有消息内容,而只有消息头和消息属性,所以在使用的时候
*  直接取出相应的头信息和属性值即可
*/
String receive = message.getStringProperty("exception");
if(null != receive && !receive.isEmpty()){
System.out.println("接收到的异常信息为:"+receive);
}

/*
* 接收TextMessage类型的消息
* 消费者在接收TextMessage对象时,可以使用getText()方法提取出String类型的消息内容
* 如果传送的TextMessage不含消息内容,那么getText()方法将返回一个null值
*/
if(message instanceof TextMessage){
System.out.println("type:TextMessage!!");
String receiveMessage = ((TextMessage) message).getText();
System.out.println("消费者收到的消息为:"+receiveMessage);
}

/*
* 使用方法和Map类似
*/
if(message instanceof MapMessage){
System.out.println("type:MapMessage!!");
int age = ((MapMessage) message).getInt("age");
String name = ((MapMessage) message).getString("name");
String password = ((MapMessage) message).getString("password");
System.out.println("年龄:"+age+" 姓名:"+name+" 密码:"+password);
}

/*
* 消费者在接收ObjectMessage时,它可以使用getObject()方法提取出消息内容,如果传送的
* ObjectMessage对象不含有消息内容,则返回的是一个null值
* 尽管ObjectMessage看起来使用非常方便,但是这种消息类型有一个问题,消息的生产者和消息的消费者
* 是不能跨平台的,必须都是JAVA平台,无法做到高度的可移植性,那怎么来解决JAVA对象的跨平台传递问题了?
* 解决方案有2个:
* 1:即使用MapMessage来代替ObjectMessage,例如一个User中有3个属性,我们可以将这3个属性拆分后
* 分别设置到MapMessage中,当消费者消费消息的时候,再组装成具体平台下的类对象即可
* 2:先将类对象序列化,然后消费者消费消息的时候,在反序列化,比较常规的做法就是,先将一个类对象转成JSON格式
* 的字符串,然后通过TextMessage来发送消息,当消费者接收到文本类型的消息之后,再将JSON格式的字符串,
* 反序列化为类对象即可
*/
if(message instanceof ObjectMessage){
System.out.println("type:ObjectMessage!!");
User user = (User) ((ObjectMessage) message).getObject();
System.out.println(user.toString());
}

/*
* 注意:在接收这种类型的消息时,很可能导致错误,所以,最好是使用和写入时相同的数据类型,并以同样的顺序来读取
* ByteMessage的消息内容
*/
if(message instanceof BytesMessage){
System.out.println("type:BytesMessage!!");
BytesMessage bm = (BytesMessage)message;
System.out.println("bytesMessage:"+bm.readUTF());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
配置文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"
xmlns:jpa="http://www.springframework.org/schema/data/jpa"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-3.2.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa-1.3.xsd"> 
<!-- 扫描注解包 -->
<context:annotation-config />
<context:component-scan base-package="com.chhliu.myself.activemq.start"></context:component-scan>

<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="failover:(tcp://localhost:61616,tcp://localhost:61617)" />
<property name="trustedPackages">
<list>
<!-- 注意:当发送ObjectMessage的消息体是一个JAVA对象的时候,我们需要在JMS工厂创建时,指定信任的包,否则会抛如下异常:
Caused by: java.lang.ClassNotFoundException: Forbidden class com.chhliu.myself.activemq.start.async.User! This class is not trusted to be serialized as ObjectMessage payload
-->				<value>com.chhliu.myself.activemq.start.async</value>
</list>
</property>
</bean>

<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory" />
<property name="maxConnections" value="10" />
</bean>

<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory" />
</bean>

<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>

<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>NTF_MOCK_INPUT</value>
</constructor-arg>
</bean>

<bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>NTF_MOCK_OUTPUT</value>
</constructor-arg>
</bean>

<!-- 消息监听器 -->
<bean id="messageConsumerReceive" class="com.chhliu.myself.activemq.start.async.MessageConsumerReceive"/>
<!-- 消息监听容器 -->
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="messageConsumerReceive" />
</bean>
</beans>
测试结果如下:
接收到的异常信息为:java.lang.NosuchMethodException
type:TextMessage!!
消费者收到的消息为:my name is chhliu!
type:MapMessage!!
年龄:27 姓名:chhliu 密码:123456
type:BytesMessage!!
bytesMessage:hello world!
type:ObjectMessage!!
年龄:27 姓名:chhliu 密码:123456
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: