基于Active MQ的系统间通信示例
2017-10-30 00:00
337 查看
1、使用到的jar包
activemq-all-5.15.1.jarcommons-lang3-3.0.jar
commons-logging-1.1.3.jar
fastjson-1.1.31.jar
jackson-core-asl-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
jboss-logging-3.1.3.GA.jar
jconsole.jar
log4j-1.2.16.jar
slf4j-api-1.6.2.jar
slf4j-log4j12-1.6.2.jar
2、项目结构
3、System1-MQ-topic-S1完整代码
GlobalVar.javapackage com.system1.mq.p2p; public interface GlobalVar { public static final class Common { public static int loggerDebug = 1; } }
JMSTest.java
package com.system1.mq.p2p; import org.junit.Test; public class JMSTest { @Test public void testMyMQ() { MQ.getInstance().sendMsg("TOP1-TOP2:你好,我不是一条单纯的消息"); } public static void main(String[] args) { MQ.getInstance().new ConsumerListener().start(); } }
Logger.java
package com.system1.mq.p2p; import org.slf4j.LoggerFactory; public class Logger { private static final org.slf4j.Logger logger = LoggerFactory.getLogger(Logger.class); public static void trace(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.trace(format, args); } } public static void trace(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.trace(msg, e); } } public static void debug(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.debug(format, args); } } public static void debug(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.debug(msg, e); } } public static void info(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.info(format, args); } } public static void info(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.info(msg, e); } } public static void warn(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.warn(format, args); } } public static void warn(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.warn(msg, e); } } public static void error(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.error(format, args); } } public static void error(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.error(msg, e); } } public static void exception(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.error(msg, e); } } public static void exception(Throwable e) { logger.error(e.getMessage(), e); } }
MQ.java
package com.system1.mq.p2p; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.StringUtils; public class MQ { /** * 单例模式 */ private static MQ instance; private MQ() { } public static MQ getInstance() { if (instance == null) { instance = new MQ(); } return instance; } // 默认连接用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认连接密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认连接地址 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 连接工厂 static ConnectionFactory connectionFactory; // 连接 static Connection connection = null; // 会话 接受或者发送消息的线程 static Session session; // 消息的目的地 static Destination dest1, dest2; // topic static Topic top1, top2; // 消费者 static MessageConsumer consumer; // 生产者 static MessageProducer producer; // 队列名称 public interface QUENENAME { // quene目的地 public static final String DESTINATION1 = "S1-S2"; public static final String DESTINATION2 = "S2-S1"; // topic地址 public static final String TOP1 = "TOP1-TOP2"; public static final String TOP2 = "TOP2-TOP1"; } // 创建连接 static { // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(MQ.USERNAME, MQ.PASSWORD, MQ.BROKEURL); try { // 通过连接工厂获取连接 connection = connectionFactory.createConnection(); // 启动连接 connection.start(); // 创建session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 自动通知MQ服务器消息已收到 // 第一个参数表示是否开启事务,第二个参数表示由客户端自行决定通知MQ服务器消息已经收到,需要调用message.acknowledge()来通知服务器. // session = connection.createSession(true, // Session.CLIENT_ACKNOWLEDGE); // 生产者 // dest1 = session.createQueue(MQ.QUENENAME.DESTINATION1); // producer = session.createProducer(dest1); top1 = session.createTopic(MQ.QUENENAME.TOP1); producer = session.createProducer(top1); // 消费者 // dest2 = session.createQueue(MQ.QUENENAME.DESTINATION2); // consumer = session.createConsumer(dest2); top2 = session.createTopic(MQ.QUENENAME.TOP2); consumer = session.createConsumer(top2); } catch (JMSException e) { System.out.println("创建连接异常"); } } /** * 生产者 */ public void sendMsg(String message) { try { TextMessage txtMsg = session.createTextMessage(message); producer.send(txtMsg); } catch (JMSException e) { System.err.println("发送消息异常"); } } /** * 消费者 */ public class ConsumerListener extends Thread { @Override public void run() { super.run(); try { while (true) { TextMessage textMessage = (TextMessage) consumer.receive(); String msg = textMessage.getText(); if (StringUtils.isNotBlank(msg)) { try { new ProtocolParse(msg).handler(); } catch (Exception e) { System.err.println("协议处理异常!"); } } else { System.err.println("消费者接收消息为空!"); } } } catch (JMSException e) { System.err.println("消费者接收消息异常"); } } } }
ProtocolConstant.java
package com.system1.mq.p2p; public class ProtocolConstant { // 命令字 public interface CMD { public static final String HEARTBEAT = "heartbeat"; } }
ProtocolParse.java
package com.system1.mq.p2p; import com.alibaba.fastjson.JSONObject; import com.system1.mq.handler.ExceptionHandler; import com.system1.mq.handler.HeartbeatHandler; public class ProtocolParse { private String protocol; public ProtocolParse(String protocol) { super(); this.protocol = protocol; } public void handler() throws Exception { JSONObject jsonObj = JSONObject.parseObject(protocol); if (jsonObj == null) { throw new Exception("不符合json格式的数据!"); } // 命令字 String cmd = jsonObj.getString("cmd"); switch (cmd) { case ProtocolConstant.CMD.HEARTBEAT: new HeartbeatHandler(protocol).start(); break; default: new ExceptionHandler(protocol).start(); break; } } public String getProtocol() { return protocol; } public void setProtocol(String protocol) { this.protocol = protocol; } }
BaseMessageHandler.java
package com.system1.mq.handler; public abstract class BaseMessageHandler extends Thread { private String message; public BaseMessageHandler(String message) { super(); this.message = message; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
ExceptionHandler.java
package com.system1.mq.handler; import com.system1.mq.p2p.Logger; public class ExceptionHandler extends BaseMessageHandler { public ExceptionHandler(String message) { super(message); } @Override public void run() { super.run(); Logger.info("===异常的协议数据===" + super.getMessage()); } }
HeartbeatHandler.java
package com.system1.mq.handler; import com.system1.mq.p2p.Logger; import com.system1.mq.p2p.MQ; public class HeartbeatHandler extends BaseMessageHandler { public HeartbeatHandler(String message) { super(message); } @Override public void run() { super.run(); Logger.info("===HEARTBEAT==="); MQ.getInstance().sendMsg("收到心跳"); } }
4、System2-MQ-topic-S2完整代码
GlobalVar.javapackage com.system1.mq.p2p; public interface GlobalVar { public static final class Common { public static int loggerDebug = 1; } }
JMSTest.java
package com.system1.mq.p2p; import org.junit.Test; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; public class JMSTest { @Test public void testMyMQ() { JSONObject jsonObj = new JSONObject(); jsonObj.put("cmd", "heartbeat"); MQ.getInstance().sendMsg(JSON.toJSONString(jsonObj)); } public static void main(String[] args) { MQ.getInstance().new ConsumerListener().start(); } }
Logger.java
package com.system1.mq.p2p; import org.slf4j.LoggerFactory; public class Logger { private static final org.slf4j.Logger logger = LoggerFactory.getLogger(Logger.class); public static void trace(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.trace(format, args); } } public static void trace(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.trace(msg, e); } } public static void debug(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.debug(format, args); } } public static void debug(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.debug(msg, e); } } public static void info(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.info(format, args); } } public static void info(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.info(msg, e); } } public static void warn(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.warn(format, args); } } public static void warn(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.warn(msg, e); } } public static void error(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.error(format, args); } } public static void error(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.error(msg, e); } } public static void exception(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.error(msg, e); } } public static void exception(Throwable e) { logger.error(e.getMessage(), e); } }
MQ.java
package com.system1.mq.p2p; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.StringUtils; public class MQ { /** * 单例模式 */ private static MQ instance; private MQ() { } public static MQ getInstance() { if (instance == null) { instance = new MQ(); } return instance; } // 默认连接用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认连接密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认连接地址 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 连接工厂 static ConnectionFactory connectionFactory; // 连接 static Connection connection = null; // 会话 接受或者发送消息的线程 static Session session; // 消息的目的地 static Destination dest1, dest2; // topic static Topic top1, top2; // 消费者 static MessageConsumer consumer; // 生产者 static MessageProducer producer; // 队列名称 public interface QUENENAME { public static final String DESTINATION1 = "S1-S2"; public static final String DESTINATION2 = "S2-S1"; // topic地址 public static final String TOP1 = "TOP1-TOP2"; public static final String TOP2 = "TOP2-TOP1"; } // 创建连接 static { // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(MQ.USERNAME, MQ.PASSWORD, MQ.BROKEURL); try { // 通过连接工厂获取连接 connection = connectionFactory.createConnection(); // 启动连接 connection.start(); // 创建session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 生产者 // dest1 = session.createQueue(MQ.QUENENAME.DESTINATION2); // producer = session.createProducer(dest1); top1 = session.createTopic(MQ.QUENENAME.TOP2); producer = session.createProducer(top1); // 消费者 // dest2 = session.createQueue(MQ.QUENENAME.DESTINATION1); // consumer = session.createConsumer(dest2); top2 = session.createTopic(MQ.QUENENAME.TOP1); consumer = session.createConsumer(top2); } catch (JMSException e) { System.out.println("创建连接异常"); } } /** * 生产者 */ public void sendMsg(String message) { try { TextMessage txtMsg = session.createTextMessage(message); producer.send(txtMsg); } catch (JMSException e) { System.err.println("发送消息异常"); } } /** * 消费者 */ public class ConsumerListener extends Thread { @Override public void run() { super.run(); try { while (true) { TextMessage textMessage = (TextMessage) consumer.receive(); String msg = textMessage.getText(); if (StringUtils.isNotBlank(msg)) { System.out.println(msg); } else { System.err.println("消费者接收消息为空!"); } } } catch (JMSException e) { System.err.println("消费者接收消息异常"); } } } }
5、测试
(1)打开ActiveMQ服务器(2)运行System1-MQ-topic-S1下JMSTest.java的main方法
(3)运行System2-MQ-topic-S2下JMSTest.java的testMyMQ()方法发送消息
6、改用p2p方式发送消息
将System1-MQ-topic-S1和System2-MQ-topic-S2中的生产者和消费者的队列创建方式改变即可7、系统间通信模型图见
https://my.oschina.net/u/3416597/blog/1554950相关文章推荐
- 基于FPGA的通信系统实验
- 基于XMPP的即时通信系统的建立(四)— 组件介绍
- 基于udp的监视系统示例
- 基于udp通信协议开发的简易聊天系统1.0
- 基于消息实现系统间的通信(BIO,NIO,AIO)
- 构建VoIP Web callback系统 ---基于Web方式的phone2phone通信方式(1)
- 基于Matlab的MIMO通信系统仿真(上)
- 基于CDMA网络的自来水厂水井生产监控系统通信解决方案
- [20180313智慧餐厅推荐系统02]基于python的socket编程代码,实现PC与服务器的简单通信
- 基于智能空间布局优化的系留低空应急通信系统设计及研究
- 基于XMPP协议的低传输负载的即时通信方法及其系统 -专利
- 基于TCP/IP协议的电力通信综合监控管理系统设计与实现
- 基于MATLAB的QPSK通信系统
- 基于LINUX系统的SOCKET通信,使用UDP协议.
- 基于STM8的ADC0832采集及蓝牙通信系统
- 基于ApacheMQ的系统间通信模型
- 系统间通信:基于TCP协议的RPC实现范例
- 基于TMS320LF2407和CAN总线的通信应用系统设计
- 初探基于CDMA网络的移动数据通信系统
- 一个简单的基于node.js的TCP服务器和基于C++的TCP客户端通信示例程序