您的位置:首页 > 产品设计 > UI/UE

MQTT moquette 的 Blocking API 发布消息服务端使用

2013-07-11 15:22 441 查看
参看官方文档:

http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/index.jsp?topic=/com.ibm.mq.amqtat.doc/tt00000_.htm

* Java 为 MQ Telemetry Transport 创建异步发布程序

*在此任务中,您将遵循教程来修改第一个发布程序。通过修改,

*使应用程序能够发送发布而不等待传递确认信息。传递确认

*信息由您创建的回调类来接收。

*

*

*

*4.使客户机断开连接

* a.除去其中包含 token.waitForCompletion 表达式的语句。 主线程将继续执行,而不等待传递发布。

* b.测试客户机是否已断开连接。 将错误返回到 MqttCallback 中的 lostConnection 方法之后,MQTT 客户机将断开连接,客户机应用程序也可能断开连接。测试是否有打开的连接。

* c.使用常量 Example.quiesceTimeout 来设置使客户机停顿的最长时间。

* if (client.isConnected())

* client.disconnect(Example.quiesceTimeout);

*当满足下面三种情况的组合形式时,客户机就完成了:

* a.已经对在此会话中(如果重新启动了会话,则是在先前会话中)已发布的所有消息调用了回调。

* b.消息未完成,然而停顿时间间隔已到期。缺省情况下,停顿时间间隔为 30 秒。通过将要等待的毫秒数作为 client.disconnect 的一个参数来传递,即可更改停顿超时。

* c.在发布了某些消息并由客户机进行排队之后,但是在发送这些消息之前调用了 client.disconnect。已排队的消息尚未处于“未完成”状态。如果会话可重新启动,那么重新启动会话时就会重新发送消息。

* 缺省情况下,停顿时间间隔为 30 秒。

MQTT的消息发布代码:

package com.etrip.wsmqtt.server;

import com.ibm.micro.client.mqttv3.MqttClient;
import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
import com.ibm.micro.client.mqttv3.MqttMessage;
import com.ibm.micro.client.mqttv3.MqttTopic;
/**
* 使用 Java 为 MQ Telemetry Transport 创建异步发布程序
*
*
*
*
* 消息发布的类的具体的实现
*
* @author longgangbai
*
*/
public class WSMQTTServerPubAsync {
public static void main(String[] args) {
try {
//创建MqttClient对象
MqttClient client = new MqttClient(WSMQTTServerCommon.TCPAddress, WSMQTTServerCommon.clientId);

//创建MQTT相关的主题
MqttTopic topic = client.getTopic(WSMQTTServerCommon.topicString);

//创建MQTT的消息体
MqttMessage message = new MqttMessage();
//设置消息传输的类型
message.setQos(2);

//设置是否在服务器中保存消息体
message.setRetained(false);

//设置消息的内容
message.setPayload(WSMQTTServerCommon.publication.getBytes());

//创建一个MQTT的回调类
WSMQTTServerCallBack callback = new WSMQTTServerCallBack(WSMQTTServerCommon.clientId);

//MqttClient绑定
client.setCallback(callback);

//MqttClient连接
client.connect();

System.out.println("Publishing \"" + message.toString()
+ "\" on topic \"" + topic.getName() + "\" with QoS = "
+ message.getQos());
System.out.println("For client instance \"" + client.getClientId()
+ "\" on address " + client.getServerURI() + "\"");

//发送消息并获取回执
MqttDeliveryToken token = topic.publish(message);

System.out.println("With delivery token \"" + token.hashCode()
+ " delivered: " + token.isComplete());
Thread.sleep(100000000000000l);

//关闭连接
if (client.isConnected())
client.disconnect(WSMQTTServerCommon.quiesceTimeout);
System.out.println("Disconnected: delivery token \"" + token.hashCode()
+ "\" received: " + token.isComplete());
} catch (Exception e) {
e.printStackTrace();
}
}
}


package com.etrip.wsmqtt.server;

import com.ibm.micro.client.mqttv3.MqttClient;
import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
import com.ibm.micro.client.mqttv3.MqttMessage;
import com.ibm.micro.client.mqttv3.MqttTopic;
/**
* 使用 Java 为 MQ Telemetry Transport 创建异步发布程序
*
*
*
*
* 消息发布的类的具体的实现
*
* @author longgangbai
*
*/
public class WSMQTTServerPubAsync {
public static void main(String[] args) {
try {
//创建MqttClient对象
MqttClient client = new MqttClient(WSMQTTServerCommon.TCPAddress, WSMQTTServerCommon.clientId);

//创建MQTT相关的主题
MqttTopic topic = client.getTopic(WSMQTTServerCommon.topicString);

//创建MQTT的消息体
MqttMessage message = new MqttMessage();
//设置消息传输的类型
message.setQos(2);

//设置是否在服务器中保存消息体
message.setRetained(false);

//设置消息的内容
message.setPayload(WSMQTTServerCommon.publication.getBytes());

//创建一个MQTT的回调类
WSMQTTServerCallBack callback = new WSMQTTServerCallBack(WSMQTTServerCommon.clientId);

//MqttClient绑定
client.setCallback(callback);

//MqttClient连接
client.connect();

System.out.println("Publishing \"" + message.toString()
+ "\" on topic \"" + topic.getName() + "\" with QoS = "
+ message.getQos());
System.out.println("For client instance \"" + client.getClientId()
+ "\" on address " + client.getServerURI() + "\"");

//发送消息并获取回执
MqttDeliveryToken token = topic.publish(message);

System.out.println("With delivery token \"" + token.hashCode()
+ " delivered: " + token.isComplete());
Thread.sleep(100000000000000l);

//关闭连接
if (client.isConnected())
client.disconnect(WSMQTTServerCommon.quiesceTimeout);
System.out.println("Disconnected: delivery token \"" + token.hashCode()
+ "\" received: " + token.isComplete());
} catch (Exception e) {
e.printStackTrace();
}
}
}


MQTT消息发布回调代码:

package com.etrip.wsmqtt.server;

import com.ibm.micro.client.mqttv3.*;
/**
* 发布消息的回调类
*
* 必须实现MqttCallback的接口并实现对应的相关接口方法
* 		◦CallBack 类将实现 MqttCallBack。每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。在回调中,将它用来标识已经启动了该回调的哪个实例。
* 	◦必须在回调类中实现三个方法:
*
* 	public void messageArrived(MqttTopic topic, MqttMessage message)
* 	接收已经预订的发布。
*
* 	public void connectionLost(Throwable cause)
* 	在断开连接时调用。
*
* 	public void deliveryComplete(MqttDeliveryToken token))
* 		接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
*
*
* 	◦由 MqttClient.connect 激活此回调。
*
* @author longgangbai
*/
public class WSMQTTServerCallBack implements MqttCallback {
private String instanceData = "";
public WSMQTTServerCallBack(String instance) {
instanceData = instance;
}
/**
* 接收到消息的回调的方法
*/
public void messageArrived(MqttTopic topic, MqttMessage message) {
try {
System.out.println("Message arrived: \"" + message.toString()
+ "\" on topic \"" + topic.toString() + "\" for instance \""
+ instanceData + "\"");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 消息连接丢失
*/
public void connectionLost(Throwable cause) {
System.out.println("Connection lost on instance \"" + instanceData
+ "\" with cause \"" + cause.getMessage() + "\" Reason code "
+ ((MqttException)cause).getReasonCode() + "\" Cause \""
+ ((MqttException)cause).getCause() +  "\"");
cause.printStackTrace();
}
/**
*
*/
public void deliveryComplete(MqttDeliveryToken token) {
try {
System.out.println("Delivery token \"" + token.hashCode()
+ "\" received by instance \"" + instanceData + "\"");
} catch (Exception e) {
e.printStackTrace();
}
}
}


package com.etrip.wsmqtt.server;

import com.ibm.micro.client.mqttv3.*;
/**
* 发布消息的回调类
*
* 必须实现MqttCallback的接口并实现对应的相关接口方法
* 		◦CallBack 类将实现 MqttCallBack。每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。在回调中,将它用来标识已经启动了该回调的哪个实例。
* 	◦必须在回调类中实现三个方法:
*
* 	public void messageArrived(MqttTopic topic, MqttMessage message)
* 	接收已经预订的发布。
*
* 	public void connectionLost(Throwable cause)
* 	在断开连接时调用。
*
* 	public void deliveryComplete(MqttDeliveryToken token))
* 		接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
*
*
* 	◦由 MqttClient.connect 激活此回调。
*
* @author longgangbai
*/
public class WSMQTTServerCallBack implements MqttCallback {
private String instanceData = "";
public WSMQTTServerCallBack(String instance) {
instanceData = instance;
}
/**
* 接收到消息的回调的方法
*/
public void messageArrived(MqttTopic topic, MqttMessage message) {
try {
System.out.println("Message arrived: \"" + message.toString()
+ "\" on topic \"" + topic.toString() + "\" for instance \""
+ instanceData + "\"");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 消息连接丢失
*/
public void connectionLost(Throwable cause) {
System.out.println("Connection lost on instance \"" + instanceData
+ "\" with cause \"" + cause.getMessage() + "\" Reason code "
+ ((MqttException)cause).getReasonCode() + "\" Cause \""
+ ((MqttException)cause).getCause() +  "\"");
cause.printStackTrace();
}
/**
*
*/
public void deliveryComplete(MqttDeliveryToken token) {
try {
System.out.println("Delivery token \"" + token.hashCode()
+ "\" received by instance \"" + instanceData + "\"");
} catch (Exception e) {
e.printStackTrace();
}
}
}


常量类:

package com.etrip.wsmqtt.server;

import java.util.UUID;
/**
*
* 消息发布消息的常量字段
*
* @author longgangbai
*/
public final class WSMQTTServerCommon {
//发布broker的ip和端口
public static final String  TCPAddress =System.getProperty("TCPAddress", "tcp://192.168.208.46:1883");
//客户端的Id
public static String clientId =String.format("%-23.23s",  System.getProperty("clientId", (UUID.randomUUID().toString())).trim()).replace('-', '_');
//发布消息的主题
public static final String topicString = System.getProperty("topicString", "china/beijing");
//发布的消息
public static final String publication =System.getProperty("publication", "Hello World " + String.format("%tc", System.currentTimeMillis()));
//超时时间
public static final int quiesceTimeout = Integer.parseInt(System.getProperty("timeout", "10000"));

public static final int  sleepTimeout = Integer.parseInt(System.getProperty("timeout", "10000"));

public static final boolean cleanSession =Boolean.parseBoolean(System.getProperty("cleanSession", "false"));

public static final int QoS =Integer.parseInt(System.getProperty("QoS", "1"));

public static final boolean retained =Boolean.parseBoolean(System.getProperty("retained", "false"));
}


package com.etrip.wsmqtt.server;

import java.util.UUID;
/**
*
* 消息发布消息的常量字段
*
* @author longgangbai
*/
public final class WSMQTTServerCommon {
//发布broker的ip和端口
public static final String  TCPAddress =System.getProperty("TCPAddress", "tcp://192.168.208.46:1883");
//客户端的Id
public static String clientId =String.format("%-23.23s",  System.getProperty("clientId", (UUID.randomUUID().toString())).trim()).replace('-', '_');
//发布消息的主题
public static final String topicString = System.getProperty("topicString", "china/beijing");
//发布的消息
public static final String publication =System.getProperty("publication", "Hello World " + String.format("%tc", System.currentTimeMillis()));
//超时时间
public static final int quiesceTimeout = Integer.parseInt(System.getProperty("timeout", "10000"));

public static final int  sleepTimeout = Integer.parseInt(System.getProperty("timeout", "10000"));

public static final boolean cleanSession =Boolean.parseBoolean(System.getProperty("cleanSession", "false"));

public static final int QoS =Integer.parseInt(System.getProperty("QoS", "1"));

public static final boolean retained =Boolean.parseBoolean(System.getProperty("retained", "false"));
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐