Java中mqtt消息队列发送和订阅消息
2018-07-16 16:12
639 查看
1.首先本地建立mqtt协议的服务器
2.直接上代码
3.我是用的mqtt3 直接引入包:
2.直接上代码
package io.test; import java.util.Date; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class mqtt3 { public static void main(String[] args) throws MqttException { // 本地mq服务器 String THINGSBOARD_HOST = "tcp://localhost:1883"; //标识mq发送的客户端id String PUBLISH_CLIENTID = "clientId"; //发送到服务器的topic标识 String PUBLISH_TOPIC = "v1/devices/me/telemetry"; //订阅到服务器的topic标识 String SUBSCRIBE_TOPIC = "v1/devices/me/rpc/request/+"; //mqtt连接 MqttClient mqttClient = new MqttClient(THINGSBOARD_HOST, PUBLISH_CLIENTID, new MemoryPersistence()); //设置超时时间 mqttClient.setTimeToWait(10000); //进行连接 mqttClient.connect(getOptions()); //订阅 mqttClient.subscribe(SUBSCRIBE_TOPIC, 2); //回调方法 mqttClient.setCallback(new MqttCallbackExtended() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String context = new String(message.getPayload()); System.out.println(context); } @Override public void deliveryComplete(IMqttDeliveryToken token) { // TODO Auto-generated method stub } @Override public void connectionLost(Throwable cause) { // TODO Auto-generated method stub System.out.println("connect lost"); } @Override public void connectComplete(boolean reconnect, String serverURI) { // TODO Auto-generated method stub System.out.println("connect success-------"); } }); String sendCon="{\"ts\":"+new Date().getTime()+", \"values\":{\"test\":1111}}"; mqttClient.publish(PUBLISH_TOPIC, sendCon.getBytes(), 0, false); System.out.println("发送成功:"+sendCon); } //连接到mqtt的连接参数配置 public static MqttConnectOptions getOptions() { MqttConnectOptions options = new MqttConnectOptions(); //设置session是否保留上一条记录 options.setCleanSession(false); //连接超时时间 options.setConnectionTimeout(10); //心跳会话时间 options.setKeepAliveInterval(60); //自动重连 options.setAutomaticReconnect(true); return options; } }
3.我是用的mqtt3 直接引入包:
<!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency>
相关文章推荐
- java MQTT简单的订阅消息实例
- java调用rabbitmq消息队列发送和接收消息实例
- Activemq 消息发送、接收java代码实现队列模式
- [3] MQTT,mosquitto,Eclipse Paho---怎样使用 Eclipse Paho MQTT工具来发送订阅MQTT消息?
- Java——定时请求后端接口数据发送RabbitMQ消息队列到指定MQ服务器
- 使用JAVA向微软消息队列(MSMQ)发送和接收消息
- [3] MQTT,mosquitto,Eclipse Paho---如何使用 Eclipse Paho MQTT工具来发送订阅MQTT消息?
- Java消息队列-Spring整合ActiveMq
- MSMQ-发送消息到远程专用队列 实例
- 向消息队列发送一个消息(后进先出LIFO),OSQPostFront()
- Java模拟异步消息的发送与回调
- java编写简单消息队列,实现高德坐标变形服务
- BizTalk Server 基于消息的基本数据交换(一) - 发送端口直接订阅接收端口
- [9] MQTT,mosquitto,Eclipse Paho---MQTT消息格式之SUBACK(消息订阅应答)消息分析
- MQTT的学习研究(十二) MQTT moquette 的 Future API 消息发布订阅的实现
- ActiveMQ实现消息队列发送邮件
- Java 微信发送模板消息
- 同时消费队列和发布订阅两类型消息
- 使用消息队列 异步插入数据,能发送消息,但是无法读取消息
- java 微信发送客服消息