您的位置:首页 > Web前端 > JavaScript

基于Active MQ的系统间通信示例

2017-10-30 00:00 337 查看

1、使用到的jar包

activemq-all-5.15.1.jar
commons-lang3-3.0.jar
commons-logging-1.1.3.jar
fastjson-1.1.31.jar
jackson-core-asl-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
jboss-logging-3.1.3.GA.jar
jconsole.jar
log4j-1.2.16.jar
slf4j-api-1.6.2.jar
slf4j-log4j12-1.6.2.jar

2、项目结构





3、System1-MQ-topic-S1完整代码

GlobalVar.java

package com.system1.mq.p2p;

public interface GlobalVar {
public static final class Common {
public static int loggerDebug = 1;
}
}

JMSTest.java

package com.system1.mq.p2p;

import org.junit.Test;

public class JMSTest {
@Test
public void testMyMQ() {
MQ.getInstance().sendMsg("TOP1-TOP2:你好,我不是一条单纯的消息");
}

public static void main(String[] args) {
MQ.getInstance().new ConsumerListener().start();
}
}

Logger.java

package com.system1.mq.p2p;

import org.slf4j.LoggerFactory;

public class Logger {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(Logger.class);

public static void trace(String format, Object... args) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.trace(format, args);
}
}

public static void trace(String msg, Throwable e) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.trace(msg, e);
}
}

public static void debug(String format, Object... args) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.debug(format, args);
}
}

public static void debug(String msg, Throwable e) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.debug(msg, e);
}
}

public static void info(String format, Object... args) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.info(format, args);
}
}

public static void info(String msg, Throwable e) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.info(msg, e);
}
}

public static void warn(String format, Object... args) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.warn(format, args);
}
}

public static void warn(String msg, Throwable e) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.warn(msg, e);
}
}

public static void error(String format, Object... args) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.error(format, args);
}
}

public static void error(String msg, Throwable e) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.error(msg, e);
}
}

public static void exception(String msg, Throwable e) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.error(msg, e);
}
}

public static void exception(Throwable e) {
logger.error(e.getMessage(), e);
}
}

MQ.java

package com.system1.mq.p2p;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.StringUtils;

public class MQ {
/**
* 单例模式
*/
private static MQ instance;

private MQ() {
}

public static MQ getInstance() {
if (instance == null) {
instance = new MQ();
}
return instance;
}

// 默认连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
// 默认连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
// 默认连接地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
// 连接工厂
static ConnectionFactory connectionFactory;
// 连接
static Connection connection = null;

// 会话 接受或者发送消息的线程
static Session session;
// 消息的目的地
static Destination dest1, dest2;
// topic
static Topic top1, top2;
// 消费者
static MessageConsumer consumer;
// 生产者
static MessageProducer producer;

// 队列名称
public interface QUENENAME {
// quene目的地
public static final String DESTINATION1 = "S1-S2";
public static final String DESTINATION2 = "S2-S1";
// topic地址
public static final String TOP1 = "TOP1-TOP2";
public static final String TOP2 = "TOP2-TOP1";
}

// 创建连接
static {
// 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(MQ.USERNAME, MQ.PASSWORD, MQ.BROKEURL);
try {
// 通过连接工厂获取连接
connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 自动通知MQ服务器消息已收到
// 第一个参数表示是否开启事务,第二个参数表示由客户端自行决定通知MQ服务器消息已经收到,需要调用message.acknowledge()来通知服务器.
// session = connection.createSession(true,
// Session.CLIENT_ACKNOWLEDGE);
// 生产者
// dest1 = session.createQueue(MQ.QUENENAME.DESTINATION1);
// producer = session.createProducer(dest1);
top1 = session.createTopic(MQ.QUENENAME.TOP1);
producer = session.createProducer(top1);
// 消费者
// dest2 = session.createQueue(MQ.QUENENAME.DESTINATION2);
// consumer = session.createConsumer(dest2);
top2 = session.createTopic(MQ.QUENENAME.TOP2);
consumer = session.createConsumer(top2);
} catch (JMSException e) {
System.out.println("创建连接异常");
}
}

/**
* 生产者
*/
public void sendMsg(String message) {
try {
TextMessage txtMsg = session.createTextMessage(message);
producer.send(txtMsg);
} catch (JMSException e) {
System.err.println("发送消息异常");
}
}

/**
* 消费者
*/
public class ConsumerListener extends Thread {
@Override
public void run() {
super.run();
try {
while (true) {
TextMessage textMessage = (TextMessage) consumer.receive();
String msg = textMessage.getText();
if (StringUtils.isNotBlank(msg)) {
try {
new ProtocolParse(msg).handler();
} catch (Exception e) {
System.err.println("协议处理异常!");
}
} else {
System.err.println("消费者接收消息为空!");
}
}
} catch (JMSException e) {
System.err.println("消费者接收消息异常");
}
}
}
}

ProtocolConstant.java

package com.system1.mq.p2p;

public class ProtocolConstant {
// 命令字
public interface CMD {
public static final String HEARTBEAT = "heartbeat";
}
}

ProtocolParse.java

package com.system1.mq.p2p;

import com.alibaba.fastjson.JSONObject;
import com.system1.mq.handler.ExceptionHandler;
import com.system1.mq.handler.HeartbeatHandler;

public class ProtocolParse {
private String protocol;

public ProtocolParse(String protocol) {
super();
this.protocol = protocol;
}

public void handler() throws Exception {
JSONObject jsonObj = JSONObject.parseObject(protocol);
if (jsonObj == null) {
throw new Exception("不符合json格式的数据!");
}
// 命令字
String cmd = jsonObj.getString("cmd");
switch (cmd) {
case ProtocolConstant.CMD.HEARTBEAT:
new HeartbeatHandler(protocol).start();
break;
default:
new ExceptionHandler(protocol).start();
break;
}

}

public String getProtocol() {
return protocol;
}

public void setProtocol(String protocol) {
this.protocol = protocol;
}

}

BaseMessageHandler.java

package com.system1.mq.handler;

public abstract class BaseMessageHandler extends Thread {
private String message;

public BaseMessageHandler(String message) {
super();
this.message = message;
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}
}

ExceptionHandler.java

package com.system1.mq.handler;

import com.system1.mq.p2p.Logger;

public class ExceptionHandler extends BaseMessageHandler {

public ExceptionHandler(String message) {
super(message);
}

@Override
public void run() {
super.run();
Logger.info("===异常的协议数据===" + super.getMessage());
}

}

HeartbeatHandler.java

package com.system1.mq.handler;

import com.system1.mq.p2p.Logger;
import com.system1.mq.p2p.MQ;

public class HeartbeatHandler extends BaseMessageHandler {

public HeartbeatHandler(String message) {
super(message);
}
@Override
public void run() {
super.run();
Logger.info("===HEARTBEAT===");
MQ.getInstance().sendMsg("收到心跳");
}

}


4、System2-MQ-topic-S2完整代码

GlobalVar.java

package com.system1.mq.p2p;

public interface GlobalVar {
public static final class Common {
public static int loggerDebug = 1;
}
}

JMSTest.java

package com.system1.mq.p2p;

import org.junit.Test;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

public class JMSTest {
@Test
public void testMyMQ() {
JSONObject jsonObj = new JSONObject();
jsonObj.put("cmd", "heartbeat");
MQ.getInstance().sendMsg(JSON.toJSONString(jsonObj));
}

public static void main(String[] args) {
MQ.getInstance().new ConsumerListener().start();
}
}

Logger.java

package com.system1.mq.p2p;

import org.slf4j.LoggerFactory;

public class Logger {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(Logger.class);

public static void trace(String format, Object... args) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.trace(format, args);
}
}

public static void trace(String msg, Throwable e) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.trace(msg, e);
}
}

public static void debug(String format, Object... args) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.debug(format, args);
}
}

public static void debug(String msg, Throwable e) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.debug(msg, e);
}
}

public static void info(String format, Object... args) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.info(format, args);
}
}

public static void info(String msg, Throwable e) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.info(msg, e);
}
}

public static void warn(String format, Object... args) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.warn(format, args);
}
}

public static void warn(String msg, Throwable e) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.warn(msg, e);
}
}

public static void error(String format, Object... args) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.error(format, args);
}
}

public static void error(String msg, Throwable e) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.error(msg, e);
}
}

public static void exception(String msg, Throwable e) {
if (GlobalVar.Common.loggerDebug == 1) {
logger.error(msg, e);
}
}

public static void exception(Throwable e) {
logger.error(e.getMessage(), e);
}
}

MQ.java

package com.system1.mq.p2p;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.StringUtils;

public class MQ {
/**
* 单例模式
*/
private static MQ instance;

private MQ() {
}

public static MQ getInstance() {
if (instance == null) {
instance = new MQ();
}
return instance;
}

// 默认连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
// 默认连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
// 默认连接地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
// 连接工厂
static ConnectionFactory connectionFactory;
// 连接
static Connection connection = null;

// 会话 接受或者发送消息的线程
static Session session;
// 消息的目的地
static Destination dest1, dest2;
// topic
static Topic top1, top2;
// 消费者
static MessageConsumer consumer;
// 生产者
static MessageProducer producer;

// 队列名称
public interface QUENENAME {
public static final String DESTINATION1 = "S1-S2";
public static final String DESTINATION2 = "S2-S1";
// topic地址
public static final String TOP1 = "TOP1-TOP2";
public static final String TOP2 = "TOP2-TOP1";
}

// 创建连接
static {
// 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(MQ.USERNAME, MQ.PASSWORD, MQ.BROKEURL);
try {
// 通过连接工厂获取连接
connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 生产者
// dest1 = session.createQueue(MQ.QUENENAME.DESTINATION2);
// producer = session.createProducer(dest1);
top1 = session.createTopic(MQ.QUENENAME.TOP2);
producer = session.createProducer(top1);
// 消费者
// dest2 = session.createQueue(MQ.QUENENAME.DESTINATION1);
// consumer = session.createConsumer(dest2);
top2 = session.createTopic(MQ.QUENENAME.TOP1);
consumer = session.createConsumer(top2);
} catch (JMSException e) {
System.out.println("创建连接异常");
}
}

/**
* 生产者
*/
public void sendMsg(String message) {
try {
TextMessage txtMsg = session.createTextMessage(message);
producer.send(txtMsg);
} catch (JMSException e) {
System.err.println("发送消息异常");
}
}

/**
* 消费者
*/
public class ConsumerListener extends Thread {
@Override
public void run() {
super.run();
try {
while (true) {
TextMessage textMessage = (TextMessage) consumer.receive();
String msg = textMessage.getText();
if (StringUtils.isNotBlank(msg)) {
System.out.println(msg);
} else {
System.err.println("消费者接收消息为空!");
}
}
} catch (JMSException e) {
System.err.println("消费者接收消息异常");
}
}
}
}


5、测试

(1)打开ActiveMQ服务器

(2)运行System1-MQ-topic-S1下JMSTest.java的main方法

(3)运行System2-MQ-topic-S2下JMSTest.java的testMyMQ()方法发送消息

6、改用p2p方式发送消息

将System1-MQ-topic-S1和System2-MQ-topic-S2中的生产者和消费者的队列创建方式改变即可



7、系统间通信模型图见

https://my.oschina.net/u/3416597/blog/1554950
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  ActiveMQ JSON