您的位置:首页 > 编程语言 > Java开发

JMS-ActiveMQ:Java消息服务

2016-10-12 20:46 375 查看
导读:

JMS:Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通.本文接受了JMS的基础知识,业务需求以及功能实现等。

    JMS基础---》需求----》过程-----》安装-----》代码实现

一、JMS基础:

1.连接工厂(JMS  connectionFactory)

连接工厂是客户用来创建连接的对象。根据JNDI来查询。

2.连接(connection)

JMS Connection封装了客户与JMS提供者之间的一个虚拟的连接。

3.会话(session)

JMS Session是生产消息和消费消息的一个单线程上下文。会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。

4.目的地(destination)

目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。

  消息传递域:1  Point-to-Point 消息(P2P) 点对点;

               2  Publish Subscribe messaging(Pub/Sub)发布/订阅消息

在点对点消息传递域中,目的地被称为队列(queue);在发布/订阅消息传递域中,目的地被称为主题(topic)。

5.1消息生产者(producer)

消息生产者是会话创建的一个对象,用于把消息发送到一个目的地。

5.2消息消费者(consumer)

消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。

同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。

异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。消费者类必须实现MessageListener接口,然后在onMessage方法中监听消息的到达并处理。

6.消息(message)

JMS消息由以下三部分组成:

消息头:每个消息头字段都有相应的getter和setter方法。

消息属性:如果需要除消息头字段以外的值,那么可以使用消息属性。

消息体:JMS定义的消息类型有,简单文本(TextMessage)、可序列化的对象(ObjectMessage)、属性集合(MapMessage)、字节流(BytesMessage)、原始值流(StreamMessage)

JMS应用场合:如果有更新,服务器端发送更新信息到客户端;广播

 

二、需求:

一个电子商务系统,拥有大量用户。当用户下单后,需要短信或邮件通知对方。

 

三、过程:

在系统架构中,核心业务系统(A)负责处理用户订单,但订单成功生成后,核心业务发送通知到消息驱动的子系统(B)。B系统接到通知后,负责发送短信或电子邮件,发送成功后告知A系统。

1.A发送通知--->2.B监听,并受到消息--->3.B处理消息,并发送回复----->4.A监听回复,接受到回复消息,将回复消息记录到数据库。

 

四、安装、启动、测试:

安装:在http://activemq.apache.org/download.html 下载5.0.0发行包,解压即可,

启动:window环境运行解压目录下的/bin/activemq.bat

查看:http://127.0.0.1:8161/admin

 

五、扩展:一个生产者,多个消费者

第二个消费者也需要实现listener,和第一个消费者一样,只是需要指定不同的clientId和消费者名:

connection.setClientID("MyClient2");

TopicSubscriber consumer = jmsSession.createDurableSubscriber(

     (Topic) envContext.lookup("jms/topic/MyTopic"), "MySub2");

 

六、代码实现:发送消息---》接受消息---》服务器配置

//1 发送消息(接受回复消息)

public class SenderMessageService {

//发布指定消息到指定地址(在发布之前,建议将消息保存到数据库)

 public void publish(String type, Object object) {

  try {

   InitialContext initCtx = new InitialContext();

   //1

   Context envContext = (Context) initCtx.lookup("java:comp/env");

   //2

   ConnectionFactory connectionFactory = (ConnectionFactory) envContext

     .lookup("jms/NormalConnectionFactory");

   //3

   Connection connection = connectionFactory.createConnection();

   //4

   Session jmsSession = connection.createSession(false,

     Session.AUTO_ACKNOWLEDGE);

   //5,6 Destination:需指定其对应的主题(subject)名称

   MessageProducer producer = jmsSession

     .createProducer((Destination) envContext

       .lookup("jms/topic/MyTopic"));

   // 设置持久方式:根据Destination创建MessageProducer对象,同时设置其持久模式

   producer.setDeliveryMode(DeliveryMode.PERSISTENT);

   //7

   Message message = jmsSession.createMessage();

   message.setObjectProperty(type, object);

   Topic topic = jmsSession.createTopic("jms/topic/MyTopic");

   //8

   message.setJMSReplyTo(topic);

   //发送消息

   producer.send(message);

   

   //9 接受回复的消息

   MessageConsumer consumer = jmsSession.createConsumer(topic);

   consumer.setMessageListener(new MessageListener() {

    public void onMessage(Message message) {

     if (message != null && message instanceof TextMessage) {

      String messageReceived = null;

      try {

       messageReceived = ((TextMessage) message).getText();

      } catch (JMSException e) {

       e.printStackTrace();

      }

      System.out

        .println("reply message received from customer1:"

          + messageReceived);

     }

    }

   });

   connection.start();

   

   // 发布刷新帖子消息

   // testMessage.clearProperties();

   // testMessage.setStringProperty("RefreshThreadId", "331");

   // producer.send(testMessage);

  } catch (NamingException e) {

   e.printStackTrace();

  } catch (JMSException e) {

   e.printStackTrace();

  }

 }

}

//2接受消息(发送回复)

import javax.servlet.*;

import javax.servlet.http.*;

import javax.naming.*;

import javax.jms.*;

import com.brightmart.MessageAction;

import com.brightmart.SM;

import com.util.mail.TestSendMail;

// 初始化jms连接,创建topic监听器;指定接收消息时候,做的对应处理

public class JMSListener extends HttpServlet implements MessageListener {

 private static final long serialVersionUID = 3963233366687996777L;

 //初始化jms连接,创建topic监听器

 public void init(ServletConfig config) throws ServletException {

  try {

   InitialContext initCtx = new InitialContext();

   Context envContext = (Context) initCtx.lookup("java:comp/env");// 1

   // 根据JNDI获取

   ConnectionFactory connectionFactory = (ConnectionFactory) envContext

     .lookup("jms/FailoverConnectionFactory");// 2

   Connection connection = connectionFactory.createConnection();// 3

   // 给connection设置一个clientId

   connection.setClientID("MyClient");

   // 会话:两个参,事务和应答模式

   Session jmsSession = connection.createSession(false,

     Session.CLIENT_ACKNOWLEDGE);// 4 AUTO_ACKNOWLEDGE

   // 普通消息订阅者,无法接收持久消息// MessageConsumer consumer =

   // jmsSession.createConsumer((Destination)//envContext.lookup("jms/topic/MyTopic"));

   // 基于Topic创建持久的消息订阅者,前提:Connection必须指定一个唯一的clientId,当前为MyClient

   TopicSubscriber consumer = jmsSession.createDurableSubscriber(

     (Topic) envContext.lookup("jms/topic/MyTopic"), "MySub");// 5

   consumer.setMessageListener(this);

   connection.start();

  } catch (NamingException e) {

   e.printStackTrace();

  } catch (JMSException e) {

   e.printStackTrace();

  }

 }

 //接收消息,做相应处理

 public void onMessage(Message message) {

  System.out.println("message in coustomer1.");

  if (message == null) {

   return;

  }

  try {

   if (message.getObjectProperty("email") != null) {

    String emailAddress = (String) message

      .getObjectProperty("email");

    TestSendMail sendMail = new TestSendMail();

    sendMail.sendMail(emailAddress);

    message.acknowledge();

    Destination d = message.getJMSReplyTo();

    Session sessionn = getConnection().createSession(false,

      Session.CLIENT_ACKNOWLEDGE);

    MessageProducer p = sessionn.createProducer(d);

    TextMessage tm = sessionn

      .createTextMessage("ustomer1 RECEIVED a email type message");

    System.out

      .println("customer1 RECEIVED a email type message");

    p.send(tm);

   } else if (message.getObjectProperty("message") != null) {

    MessageAction m = new MessageAction();

    SM sm = new SM();

    sm.setDestTermId((String) message.getObjectProperty("message"));

    sm.setMsgContent("分布式JMS-ActiveMQ系统测试");

    m.addSM(sm);

    message.acknowledge();

   } else {

    System.out.println("接收普通消息,不做任何处理!");

   }

  } catch (JMSException e) {

   e.printStackTrace();

  }

 }

 public Connection getConnection() {

  InitialContext initCtx;

  try {

   initCtx = new InitialContext();

   Context envContext = (Context) initCtx.lookup("java:comp/env");

   // 根据JNDI、url、user、password获取

   ConnectionFactory connectionFactory = (ConnectionFactory) envContext

     .lookup("jms/FailoverConnectionFactory");

   Connection connection = connectionFactory.createConnection();

   return connection;

  } catch (Exception e) {

   e.printStackTrace();

   return null;

  }

 }

}

 

//3 context.xml

Context中添加配置:

<Resource

name="jms/FailoverConnectionFactory"

auth="Container"

type="org.apache.activemq.ActiveMQConnectionFactory"

description="JMS Connection Factory"

factory="org.apache.activemq.jndi.JNDIReferenceFactory"

brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5"

brokerName="localhost"

useEmbeddedBroker="false"/>

<Resource

name="jms/NormalConnectionFactory"

auth="Container"

type="org.apache.activemq.ActiveMQConnectionFactory"

description="JMS Connection Factory"

factory="org.apache.activemq.jndi.JNDIReferenceFactory"

brokerURL="tcp://localhost:61616"

brokerName="localhost"

useEmbeddedBroker="false"/>

<Resource name="jms/topic/MyTopic"

auth="Container"

type="org.apache.activemq.command.ActiveMQTopic"

factory="org.apache.activemq.jndi.JNDIReferenceFactory"

physicalName="MY.TEST.FOO"/>

<Resource name="jms/queue/MyQueue"

auth="Container"

type="org.apache.activemq.command.ActiveMQQueue"

factory="org.apache.activemq.jndi.JNDIReferenceFactory"

physicalName="MY.TEST.FOO.QUEUE"/>

 

4.发送短信或邮件

请参考本博客前两篇文章

注:

1.系统下载ActiveMQ,并允许;

2.ActiveMQ需要融合web服务器,如可以配置tomcat服务器的context.xml;
3在项目中,需要引入ActiveMQ的jar.

原文链接:点击打开链接
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java jms 消息中间件