您的位置:首页 > 编程语言 > Qt开发

IBM MQTT消息推送

2017-03-14 10:53 127 查看
需求

前端(C# .net)订阅主题,后端(JAVA)服务推送。

相关环境

java JDK1.7

maven

ibm mq 1.0

spring

… …

MQ MAVEN相关:

<properties>
<jdk.version>1.7</jdk.version>
<spring.version>4.2.5.RELEASE</spring.version>
<ibm.mq.version>1.0</ibm.mq.version>
... ...
</properties>


<!-- IBM MQ START -->
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>jms</artifactId>
<version>${ibm.mq.version}</version>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>${ibm.mq.version}</version>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.axis2</artifactId>
<version>${ibm.mq.version}</version>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.commonservices</artifactId>
<version>${ibm.mq.version}</version>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.defaultconfig</artifactId>
<version>${ibm.mq.version}</version>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.headers</artifactId>
<version>${ibm.mq.version}</version>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq</artifactId>
<version>${ibm.mq.version}</version>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mqjms</artifactId>
<version>${ibm.mq.version}</version>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>fscontext</artifactId>
<version>${ibm.mq.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- IBM MQ END -->


后端MQ推送代码

package com.***.mq;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.util.ObjectUtils;

import com.alibaba.fastjson.serializer.SerializerFeature;
import com.ibm.mq.jms.MQConnectionFactory;
import com.***.common.exception.RmsException;
import com.***.common.json.JSONUtils;
import com.***.common.util.ApplicationUtils;
import com.***.pojo.PushPojo;
import com.***.util.ConfigUtil;
import com.***.util.enums.EnumTopic;

public class JMSSender {

private static Logger logger = LogManager.getLogger(JMSSender.class);

/**
* 发送主题消息
*
* @date 2016-7-22
* @param topic
*            主题信息 EnumTopic中选择
* @param message
*            消息内容
* @param username
*            用户名
* @param password
*            密码
* @throws JMSException
*/
public static void jmsSender(String topic, String message, String username,
String password) {
if (logger.isDebugEnabled()) {
logger.debug("mq push start");
logger.debug("username:" + username);
logger.debug("password:" + password);
}
MQConnectionFactory mqcf = ApplicationUtils.getBean(
"jmsConnectionFactory", MQConnectionFactory.class);
try {
try (
Connection conn = mqcf.createConnection(username, password);
Session sion = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
) {
logger.debug("topic:" + topic);
logger.info("push message:" + message);
Destination dti = sion.createTopic(topic);
MessageProducer pdc = sion.createProducer(dti);
TextMessage msg = sion.createTextMessage(message);
conn.start();
pdc.send(msg);
logger.debug("push successfully");
}
} catch (JMSException e) {
if (logger.isErrorEnabled()) {
logger.error(e);
}
RmsException.throwException("mq.rms.jms");
} finally {
if (logger.isDebugEnabled()) {
logger.debug("mq push end");
}
}
}

/**
* 对推送的JSON消息进行二次处理。
*/
private static void jmsSender(String topic, PushPojo o) {
String message = JSONUtils.toJSONString(o,SerializerFeature.WriteMapNullValue,SerializerFeature.WriteNullStringAsEmpty,
SerializerFeature.WriteNullNumberAsZero,SerializerFeature.WriteDateUseDateFormat);
jmsSender(topic, message, ConfigUtil.get("ibm.mq.username"),ConfigUtil.get("ibm.mq.password"));
}

/**
* 入口。
*
* @param topic
*            主题
* @param o
*            推送的消息
*/
public static void jmsSender(EnumTopic topic , PushPojo o) {
int addCount = !ObjectUtils.isEmpty(o.getAddData()) ? o.getAddData().length : 0;
int editCount = !ObjectUtils.isEmpty(o.getEditData()) ? o.getEditData().length : 0;
int delCount = StringUtils.isNotBlank(o.getDelData()) ? o.getDelData().split(",").length : 0;
StringBuffer s = new StringBuffer("");
s.append("push addData[" + addCount + "]");
s.append(",editData[" + editCount + "]");
s.append(",delData[" + delCount + "]");
logger.info(s.toString());

jmsSender(topic.getCode(), o);
}

}


PushPojo实体代码

package com.***.pojo;

public class PushPojo {
private String type;
/** 值机/行李/值机/转盘 */
private String module;
private Object[] addData;   // 需要增加的数据
private Object[] editData;  // 需要修改的数据
private String delData;     // 需要删除的数据ID

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}

public String getModule() {
return module;
}

public void setModule(String module) {
this.module = module;
}

public String getDelData() {
return delData;
}

public void setDelData(String delData) {
this.delData = delData;
}

public Object[] getAddData() {
return addData;
}

public void setAddData(Object[] addData) {
this.addData = addData;
}

public Object[] getEditData() {
return editData;
}

public void setEditData(Object[] editData) {
this.editData = editData;
}
}


主题枚举

package com.***.util.enums;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public enum EnumTopic {
Flight("Flight", "航班主题", "topic://flight"),
Stand("Stand", "机位模块主题","topic://stand"),
Gate("Gate", "登机口模块主题","topic://gate"),
Carouse("Carouse", "行李转盘模块主题","topic://carouse"),
Counter("Counter", "值机柜台模块主题","topic://counter");

private String en;
private String zhCN;
private String code;

EnumTopic(String en, String zhCN, String code) {
this.en = en;
this.zhCN = zhCN;
this.code = code;
}

public String getEn() {
return en;
}

public String getCode() {
return code;
}

public String getZhCN() {
return zhCN;
}

private static List<EnumTopic> list;
static {
list = new ArrayList<EnumTopic>(Arrays.asList(values()));
}

public static List<EnumTopic> getList() {
return list;
}

public static boolean isExist(String code){
return null != get(code) ? true : false;
}

public static EnumTopic get(String code){
EnumTopic r = null;
List<EnumTopic> list = getList();
for(EnumTopic l : list){
if(l.getCode().equals(code)){
r = l;
break;
}
}
return r;
}

public static String toJson() {
List<EnumTopic> list = getList();
StringBuffer s = new StringBuffer("[");
int i = 0;
for (EnumTopic l : list) {
s.append("{");
s.append("\"en\"").append(":").append("\"" + l.getEn() + "\"");
s.append(",");
s.append("\"zhCN\"").append(":").append("\"" + l.getZhCN() + "\"");
s.append(",");
s.append("\"code\"").append(":").append("\"" + l.getCode() + "\"");
s.append("}");
if (i++ != list.size() - 1) {
s.append(",");
}
}
return s.append("]").toString();
}
}


MQ用户名通道等其它配置不用过多解释吧。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  ibm mq 推送 java