您的位置:首页 > 产品设计 > UI/UE

MQTT协议之moquette 安装使用

2015-01-25 13:11 495 查看
    在MQTT 官网 (http://mqtt.org/software)中有众多MQTT的实现方式。具体参看官网,Moquette是基于netty(老版本使用的是mina) 的模型的一个Java MQTT broker,支持websocket,SSL。

如果想直接启动 moquette-broker-0.4-jar-with-dependencies.jar的jar文件方式

 可以执行一些命令实现 

        java -jar moquette-broker-0.4-jar-with-dependencies.jar

google code 下载MQTT moquette Broker 地址:

    https://code.google.com/p/moquette-mqtt/
   项目已迁移到github:https://github.com/andsel/moquette,有人对该项目进行改造,可以选择使用mina或netty)https://github.com/milliondreams/moquette-mqtt

    

GIT 下载MQTT moquette client 地址:

    https://github.com/fusesource/mqtt-client

 

在应用程序中使用MQTT的应用:

MQTT moquette 的broker服务启动代码(启动类org.dna.mqtt.moquette.server.Server)如下:

/*
* Copyright (c) 2012-2014 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html *
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php *
* You may elect to redistribute this code under either of these licenses.
*/
package org.dna.mqtt.moquette.server;

import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.util.Properties;
import org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging;
import org.dna.mqtt.moquette.server.netty.NettyAcceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Launch a configured version of the server.
* @author andrea
*/
public class Server {

private static final Logger LOG = LoggerFactory.getLogger(Server.class);

//数据持久化数据目录,使用mapdb
/*MapDB是一个快速、易用的嵌入式Java数据库引擎,它提供了基于磁盘或者堆外(off- heap允许Java直接操作内存空间,
* 类似于C的malloc和free)存储的并发的Maps、Sets、Queues。MapDB的前身是JDBM,已经有15年的历史。
* MapDB支持 ACID事务、MVCC隔离,它的jar包只有200KB,且无其它依赖,非常轻量。
* 相对来说功能已经稳定,并有全职 的开发者支持开发。*/

public static final String STORAGE_FILE_PATH = System.getProperty("user.home") +
File.separator + "moquette_store.mapdb";

private ServerAcceptor m_acceptor;
SimpleMessaging messaging;

public static void main(String[] args) throws IOException {
final Server server = new Server();
server.startServer();
System.out.println("Server started, version 0.7-SNAPSHOT");
//进程关闭前,释放资源
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
server.stopServer();
}
});
}

/**
* Starts Moquette bringing the configuration from the file
* located at config/moquette.conf
*/
public void startServer() throws IOException {
String configPath = System.getProperty("moquette.path", null);
startServer(new File(configPath, "config/moquette.conf"));
}

/**
* Starts Moquette bringing the configuration from the given file
*/
public void startServer(File configFile) throws IOException {
ConfigurationParser confParser = new ConfigurationParser();
try {
confParser.parse(configFile);
} catch (ParseException pex) {
LOG.warn("An error occurred in parsing configuration, fallback on default configuration", pex);
}
Properties configProps = confParser.getProperties();
startServer(configProps);
}

/**
* Starts the server with the given properties.
*
* Its need at least the following properties:
* <ul>
* <li>port</li>
* <li>password_file</li>
* </ul>
*/
public void startServer(Properties configProps) throws IOException {
messaging = SimpleMessaging.getInstance();
messaging.init(configProps);

m_acceptor = new NettyAcceptor();
m_acceptor.initialize(messaging, configProps);
}

public void stopServer() {
System.out.println("Server stopping...");
messaging.stop();
m_acceptor.close();
System.out.println("Server stopped");
}
}


下载moquette-mqtt源码,导入eclipse中,运行启动类。默认端口:1883

配置说明:config/moquette.conf

##############################################
# Moquette configuration file.
#
# The synthax is equals to mosquitto.conf
#
##############################################

#启动服务端口
port 1883
#websocket 端口
websocket_port 8080
#启动主机的IP
host 0.0.0.0

#密码文件
password_file password_file.conf

##支持SSL
#ssl_port 8883
#jks_path serverkeystore.jks
#key_store_password passw0rdsrv
#key_manager_password passw0rdsrv
密码文件password_file.conf,用户名密码采用冒号分割":"。
该项目使用distribution打包成产品包。下面使用mqtt-client采用阻塞式实现消息的发布并接收。

发送消息:

package cn.smartslim.mqtt.demo.fusesource;

import java.net.URISyntaxException;

import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
/**
* MQTT moquette 的Server 段用于发布主题,并发布主题信息
* 采用阻塞式 发布主题
*/
public class MQTTServer {
private final static String CONNECTION_STRING = "tcp://192.168.36.215:1883";
private final static boolean CLEAN_START = true;
private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s

public final static long RECONNECTION_ATTEMPT_MAX=6;
public final static long RECONNECTION_DELAY=2000;

public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M

public static void main(String[] args) {
MQTT mqtt = new MQTT();
try {
//设置服务端的ip
mqtt.setHost(CONNECTION_STRING);
//连接前清空会话信息
mqtt.setCleanSession(CLEAN_START);
//设置重新连接的次数
mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
//设置重连的间隔时间
mqtt.setReconnectDelay(RECONNECTION_DELAY);
//设置心跳时间
mqtt.setKeepAlive(KEEP_ALIVE);
//设置缓冲的大小
mqtt.setSendBufferSize(SEND_BUFFER_SIZE);

//创建连接 ,使用阻塞式
BlockingConnection connection = mqtt.blockingConnection();
//开始连接
connection.connect();
try {
int count=0;
while(true){
count++;
//订阅的主题
String topic="mqtt/test";
//主题的内容
String message="hello "+count+" mqtt!";
connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE, false);
System.out.println("MQTTServer Message Topic="+topic+" Content :"+message);
Thread.sleep(2000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
接收消息:
package cn.smartslim.mqtt.demo.fusesource;

import java.net.URISyntaxException;

import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
/**
* MQTT moquette 的Client 段用于订阅主题,并接收主题信息
* 采用阻塞式 订阅主题
*/
public class MQTTClient {
private final static String CONNECTION_STRING = "tcp://192.168.36.215:1883";
private final static boolean CLEAN_START = true;
private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
public static Topic[] topics = {
new Topic("china/beijing", QoS.EXACTLY_ONCE)};

public final static long RECONNECTION_ATTEMPT_MAX=6;
public final static long RECONNECTION_DELAY=2000;

public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M

public static void main(String[] args) {
//创建MQTT对象
MQTT mqtt = new MQTT();
BlockingConnection connection=null;
try {
//设置mqtt broker的ip和端口
mqtt.setHost(CONNECTION_STRING);
//连接前清空会话信息
mqtt.setCleanSession(CLEAN_START);
//设置重新连接的次数
mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
//设置重连的间隔时间
mqtt.setReconnectDelay(RECONNECTION_DELAY);
//设置心跳时间
mqtt.setKeepAlive(KEEP_ALIVE);
//设置缓冲的大小
mqtt.setSendBufferSize(SEND_BUFFER_SIZE);

//获取mqtt的连接对象BlockingConnection
connection = mqtt.blockingConnection();
//MQTT连接的创建
connection.connect();
//创建相关的MQTT 的主题列表
Topic[] topics = {new Topic("mqtt/test", QoS.AT_LEAST_ONCE)};
//订阅相关的主题信息
byte[] qoses = connection.subscribe(topics);
//
while(true){
//接收订阅的消息内容
Message message = connection.receive();
//获取订阅的消息内容
byte[] payload = message.getPayload();
// process the message then:
System.out.println("MQTTClient Message Topic="+message.getTopic()+" Content :"+new String(payload));
//签收消息的回执
message.ack();
Thread.sleep(2000);
}
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}finally{
try {
connection.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  MQTT