您的位置:首页 > 编程语言 > Java开发

Java中mqtt消息队列发送和订阅消息

2018-07-16 16:12 639 查看
1.首先本地建立mqtt协议的服务器

2.直接上代码

package io.test;

import java.util.Date;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class mqtt3 {

public static void main(String[] args) throws MqttException {
// 本地mq服务器
String THINGSBOARD_HOST = "tcp://localhost:1883";
//标识mq发送的客户端id
String PUBLISH_CLIENTID = "clientId";
//发送到服务器的topic标识
String PUBLISH_TOPIC = "v1/devices/me/telemetry";
//订阅到服务器的topic标识
String SUBSCRIBE_TOPIC = "v1/devices/me/rpc/request/+";
//mqtt连接
MqttClient mqttClient = new MqttClient(THINGSBOARD_HOST, PUBLISH_CLIENTID, new MemoryPersistence());
//设置超时时间
mqttClient.setTimeToWait(10000);
//进行连接
mqttClient.connect(getOptions());
//订阅
mqttClient.subscribe(SUBSCRIBE_TOPIC, 2);
//回调方法
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String context = new String(message.getPayload());
System.out.println(context);
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub

}

@Override
public void connectionLost(Throwable cause) {
// TODO Auto-generated method stub
System.out.println("connect  lost");

}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
// TODO Auto-generated method stub
System.out.println("connect success-------");
}

});

String sendCon="{\"ts\":"+new Date().getTime()+", \"values\":{\"test\":1111}}";
mqttClient.publish(PUBLISH_TOPIC, sendCon.getBytes(), 0, false);
System.out.println("发送成功:"+sendCon);

}

//连接到mqtt的连接参数配置
public static MqttConnectOptions getOptions() {
MqttConnectOptions options = new MqttConnectOptions();
//设置session是否保留上一条记录
options.setCleanSession(false);
//连接超时时间
options.setConnectionTimeout(10);
//心跳会话时间
options.setKeepAliveInterval(60);
//自动重连
options.setAutomaticReconnect(true);
return options;
}

}

3.我是用的mqtt3 直接引入包:

<!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  MQTT