您的位置:首页 > 编程语言 > Qt开发

MQTT message push实现研究

2015-10-10 19:20 471 查看
本文利用开源软件对MQTT协议进行了消息推送进行分析。其中引用了网上的部分内容;

其中的一些图片请参见下面链接中的完整文件:

http://download.csdn.net/detail/junglefly/9168135

IBM® WebSphere® MQ Telemetry Transport(简称 MQTT)是一种基于 TCP/IP 的轻量级发布 / 预订协议,用于连接大量的远程传感器和控制设备。它可以工作在低带宽,不可靠的通信并且占用较少内存的设备上。

MQTT 产品作为 WebSphere MQ 产品的扩展,使用了 MQTT V3.1 版本的协议。它提供了一些小型客户机库,可以将这些客户机库嵌入到运行于不同设备平台上的智能设备中。使用客户机构建的应用程序使用 MQ Telemetry Transport(MQTT) 和 WebSphere MQ Telemetry 服务并借助 WebSphere MQ 来可靠地发布和预订消息。一个高级 MQTT 客户机(即设备的 WebSphere MQ Telemetry 守护程序)可以运行于多种平台上。它可以充当一个网络集中器,能够将更多的 MQTT 客户机连接至单个队列管理器。对于在网络发生短暂中断期间无法缓存消息的小型设备,它还可以为这些小型设备提供存储转发功能。

特点:

l 协议简单,MQTT_V3.1_Protocol_Specific总共只有42页。相比来说XMPP看起来就要复杂很多;

l 基于 TCP/IP 的轻量级发布 / 预订协议,用于连接大量的远程传感器和控制设备。

l 固定头长度仅为2字节,因此传输快速,省流量;可以工作在低带宽,不可靠的通信并且占用较少内存的设备上;

l 网上有大量的源码客户端以及开源lib,方便各种平台的客户端开发。其中有C语言编写的,适合嵌入式系统;同时也有android上的实现。

l Apache ActiveMQ,Apache Apollo这分布式开源服务器均支持MQTT协议,使用JAVA语言实现,支持多平台。Server端搭建起来相对来说比较简单,server支持集群,消息缓存等机制。

1. 推送原理分析

实际上,其他推送系统(包括GCM、XMPP方案)的原理都与此类似。

2. Mosquitto 、android客户端、PHP客户端交互测试

消息中转服务端准备

开源项目Mosquitto是一个实现了MQTT3.1协议的代理服务器,由MQTT协议创始人之一的Andy Stanford-Clark开发,它为我们提供了非常棒的轻量级数据交换的解决方案。

获取&安装

Mosquitto提供了Windows、Linux以及qnx系统的版本,安装文件可从http://mosquitto.org/files/binary/地址中获取(建议使用最新的1.1.x版本)。Windows系统下的安装过程非常简单,我们甚至可以把Mosquitto直接安装成为系统服务;但是,在实际应用中,我们更倾向于使用Linux系统的服务器,接下来我们就将重点介绍Linux版Mosquitto的安装方法。

在Linux系统上安装Mosquitto,本人建议大家使用源码安装模式,最新的源码可从http://mosquitto.org/files/source/地址中获取。解压之后,我们可以在源码目录里面找到主要的配置文件config.mk,其中包含了所有Mosquitto的安装选项,详细的参数说明如下:

[root@109 mosquitto-1.3.2]# cat config.mk

# =============================================================================

# User configuration section.

#

# Largely, these are options that are designed to make mosquitto run more

# easily in restrictive environments by removing features.

#

# Modify the variable below to enable/disable features.

#

# Can also be overriden at the command line, e.g.:

#

# make WITH_TLS=no

# =============================================================================

# Uncomment to compile the broker with tcpd/libwrap support.

#WITH_WRAP:=yes

# Comment out to disable SSL/TLS support in the broker and client.

# Disabling this will also mean that passwords must be stored in plain text. It

# is strongly recommended that you only disable WITH_TLS if you are not using

# password authentication at all.

#WITH_TLS:=yes

# Comment out to disable TLS/PSK support in the broker and client. Requires

# WITH_TLS=yes.

# This must be disabled if using openssl < 1.0.

#WITH_TLS_PSK:=yes

# Comment out to disable client client threading support.

#WITH_THREADING:=yes

# Uncomment to compile the broker with strict protocol support. This means that

# both the client library and the broker will be very strict about protocol

# compliance on incoming data. Neither of them will return an error on

# incorrect "remaining length" values if this is commented out. The old

# behaviour (prior to 0.12) is equivalent to compiling with

# WITH_STRICT_PROTOCOL defined and means that clients will be immediately

# disconnected from the broker on non-compliance.

#WITH_STRICT_PROTOCOL:=yes

# Comment out to remove bridge support from the broker. This allow the broker

# to connect to other brokers and subscribe/publish to topics. You probably

# want to leave this included unless you want to save a very small amount of

# memory size and CPU time.

#WITH_BRIDGE:=yes

# Comment out to remove persistent database support from the broker. This

# allows the broker to store retained messages and durable subscriptions to a

# file periodically and on shutdown. This is usually desirable (and is

# suggested by the MQTT spec), but it can be disabled if required.

#WITH_PERSISTENCE:=yes

# Comment out to remove memory tracking support from the broker. If disabled,

# mosquitto won't track heap memory usage nor export '$SYS/broker/heap/current

# size', but will use slightly less memory and CPU time.

#WITH_MEMORY_TRACKING:=yes

# Compile with database upgrading support? If disabled, mosquitto won't

# automatically upgrade old database versions.

# Not currently supported.

#WITH_DB_UPGRADE:=yes

# Comment out to remove publishing of the $SYS topic hierarchy containing

# information about the broker state.

WITH_SYS_TREE:=yes

# Build with Python module. Comment out if Python is not installed, or required

# Python modules are not available.

WITH_PYTHON:=yes

# Build with SRV lookup support.

#WITH_SRV:=yes

# =============================================================================

# End of user configuration

# =============================================================================

# Also bump lib/mosquitto.h, lib/python/setup.py, CMakeLists.txt,

# installer/mosquitto.nsi, installer/mosquitto-cygwin.nsi

VERSION=1.3.2

TIMESTAMP:=$(shell date "+%F %T%z")

# Client library SO version. Bump if incompatible API/ABI changes are made.

SOVERSION=1

# Man page generation requires xsltproc and docbook-xsl

XSLTPROC=xsltproc

# For html generation

DB_HTML_XSL=man/html.xsl

#MANCOUNTRIES=en_GB

。。。。。。

# 是否支持tcpd/libwrap功能.

#WITH_WRAP:=yes

# 是否开启SSL/TLS支持

#WITH_TLS:=yes

# 是否开启TLS/PSK支持

#WITH_TLS_PSK:=yes

# Comment out to disable client client threading support.

#WITH_THREADING:=yes

# 是否使用严格的协议版本(老版本兼容会有点问题)

#WITH_STRICT_PROTOCOL:=yes

# 是否开启桥接模式

#WITH_BRIDGE:=yes

# 是否开启持久化功能

#WITH_PERSISTENCE:=yes

# 是否监控运行状态

#WITH_MEMORY_TRACKING:=yes

这里需要注意的是,默认情况下Mosquitto的安装需要OpenSSL的支持;如果不需要SSL,则需要关闭config.mk里面的某些与SSL功能有关的选项(WITH_TLS、WITH_TLS_PSK)。接着,就是运行make install进行安装,完成之后会在系统命令行里发现mosquitto、mosquitto_passwd、mosquitto_pub和mosquitto_sub四个工具(截图如下),分别用于启动代理、管理密码、发布消息和订阅消息。

注:

在有些电脑上运行make install可能会出现如下错误,

install -d /usr/local/sbin

install -s --strip-program= mosquitto /usr/local/sbin/mosquitto

install: unrecognized option `--strip-program='

Try `install --help' for more information.

解决办法是:修改Makefile文件,删除--strip-program=$(STRIP)

install : all

$(INSTALL) -d ${DESTDIR}$(prefix)/sbin

$(INSTALL) -s --strip-program=$(STRIP) mosquitto ${DESTDIR}${prefix}/sbin/mosquitto

$(INSTALL) mosquitto_plugin.h ${DESTDIR}${prefix}/include/mosquitto_plugin.h

ifeq ($(WITH_TLS),yes)

$(INSTALL) -s --strip-program=$(STRIP) mosquitto_passwd ${DESTDIR}${prefix}/bin/mosquitto_passwd

Endif

配置&运行

安装完成之后,所有配置文件会被放置于/etc/mosquitto/目录下,其中最重要的就是Mosquitto的配置文件,即mosquitto.conf,以下是详细的配置参数说明。

1. # =================================================================
2. # General configuration
3. # =================================================================
4.
5. # 客户端心跳的间隔时间
6. #retry_interval 20
7.
8. # 系统状态的刷新时间
9. #sys_interval 10
10.
11. # 系统资源的回收时间,0表示尽快处理
12. #store_clean_interval 10
13.
14. # 服务进程的PID
15. #pid_file /var/run/mosquitto.pid
16.
17. # 服务进程的系统用户
18. #user mosquitto
19.
20. # 客户端心跳消息的最大并发数
21. #max_inflight_messages 10
22.
23. # 客户端心跳消息缓存队列
24. #max_queued_messages 100
25.
26. # 用于设置客户端长连接的过期时间,默认永不过期
27. #persistent_client_expiration
28.
29. # =================================================================
30. # Default listener
31. # =================================================================
32.
33. # 服务绑定的IP地址
34. #bind_address
35.
36. # 服务绑定的端口号
37. #port 1883
38.
39. # 允许的最大连接数,-1表示没有限制
40. #max_connections -1
41.
42. # cafile:CA证书文件
43. # capath:CA证书目录
44. # certfile:PEM证书文件
45. # keyfile:PEM密钥文件
46. #cafile
47. #capath
48. #certfile
49. #keyfile
50.
51. # 必须提供证书以保证数据安全性
52. #require_certificate false
53.
54. # 若require_certificate值为true,use_identity_as_username也必须为true
55. #use_identity_as_username false
56.
57. # 启用PSK(Pre-shared-key)支持
58. #psk_hint
59.
60. # SSL/TSL加密算法,可以使用“openssl ciphers”命令获取
61. # as the output of that command.
62. #ciphers
63.
64. # =================================================================
65. # Persistence
66. # =================================================================
67.
68. # 消息自动保存的间隔时间
69. #autosave_interval 1800
70.
71. # 消息自动保存功能的开关
72. #autosave_on_changes false
73.
74. # 持久化功能的开关
75. persistence true
76.
77. # 持久化DB文件
78. #persistence_file mosquitto.db
79.
80. # 持久化DB文件目录
81. #persistence_location /var/lib/mosquitto/
82.
83. # =================================================================
84. # Logging
85. # =================================================================
86.
87. # 4种日志模式:stdout、stderr、syslog、topic
88. # none 则表示不记日志,此配置可以提升些许性能
89. log_dest none
90.
91. # 选择日志的级别(可设置多项)
92. #log_type error
93. #log_type warning
94. #log_type notice
95. #log_type information
96.
97. # 是否记录客户端连接信息
98. #connection_messages true
99.
100. # 是否记录日志时间
101. #log_timestamp true
102.
103. # =================================================================
104. # Security
105. # =================================================================
106.
107. # 客户端ID的前缀限制,可用于保证安全性
108. #clientid_prefixes
109.
110. # 允许匿名用户
111. #allow_anonymous true
112.
113. # 用户/密码文件,默认格式:username:password
114. #password_file
115.
116. # PSK格式密码文件,默认格式:identity:key
117. #psk_file
118.
119. # pattern write sensor/%u/data
120. # ACL权限配置,常用语法如下:
121. # 用户限制:user <username>
122. # 话题限制:topic [read|write] <topic>
123. # 正则限制:pattern write sensor/%u/data
124. #acl_file
125.
126. # =================================================================
127. # Bridges
128. # =================================================================
129.
130. # 允许服务之间使用“桥接”模式(可用于分布式部署)
131. #connection <name>
132. #address <host>[:<port>]
133. #topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix]
134.
135. # 设置桥接的客户端ID
136. #clientid
137.
138. # 桥接断开时,是否清除远程服务器中的消息
139. #cleansession false
140.
141. # 是否发布桥接的状态信息
142. #notifications true
143.
144. # 设置桥接模式下,消息将会发布到的话题地址
145. # $SYS/broker/connection/<clientid>/state
146. #notification_topic
147.
148. # 设置桥接的keepalive数值
149. #keepalive_interval 60
150.
151. # 桥接模式,目前有三种:automatic、lazy、once
152. #start_type automatic
153.
154. # 桥接模式automatic的超时时间
155. #restart_timeout 30
156.
157. # 桥接模式lazy的超时时间
158. #idle_timeout 60
159.
160. # 桥接客户端的用户名
161. #username
162.
163. # 桥接客户端的密码
164. #password
165.
166. # bridge_cafile:桥接客户端的CA证书文件
167. # bridge_capath:桥接客户端的CA证书目录
168. # bridge_certfile:桥接客户端的PEM证书文件
169. # bridge_keyfile:桥接客户端的PEM密钥文件
170. #bridge_cafile
171. #bridge_capath
172. #bridge_certfile
173. #bridge_keyfile
174.
175. # 自己的配置可以放到以下目录中
176. include_dir /etc/mosquitto/conf.d

# =================================================================

# General configuration

# =================================================================

# 客户端心跳的间隔时间

#retry_interval 20

# 系统状态的刷新时间

#sys_interval 10

# 系统资源的回收时间,0表示尽快处理

#store_clean_interval 10

# 服务进程的PID

#pid_file /var/run/mosquitto.pid

# 服务进程的系统用户

#user mosquitto

# 客户端心跳消息的最大并发数

#max_inflight_messages 10

# 客户端心跳消息缓存队列

#max_queued_messages 100

# 用于设置客户端长连接的过期时间,默认永不过期

#persistent_client_expiration

# =================================================================

# Default listener

# =================================================================

# 服务绑定的IP地址

#bind_address

# 服务绑定的端口号

#port 1883

# 允许的最大连接数,-1表示没有限制

#max_connections -1

# cafile:CA证书文件

# capath:CA证书目录

# certfile:PEM证书文件

# keyfile:PEM密钥文件

#cafile

#capath

#certfile

#keyfile

# 必须提供证书以保证数据安全性

#require_certificate false

# 若require_certificate值为true,use_identity_as_username也必须为true

#use_identity_as_username false

# 启用PSK(Pre-shared-key)支持

#psk_hint

# SSL/TSL加密算法,可以使用“openssl ciphers”命令获取

# as the output of that command.

#ciphers

# =================================================================

# Persistence

# =================================================================

# 消息自动保存的间隔时间

#autosave_interval 1800

# 消息自动保存功能的开关

#autosave_on_changes false

# 持久化功能的开关

persistence true

# 持久化DB文件

#persistence_file mosquitto.db

# 持久化DB文件目录

#persistence_location /var/lib/mosquitto/

# =================================================================

# Logging

# =================================================================

# 4种日志模式:stdout、stderr、syslog、topic

# none 则表示不记日志,此配置可以提升些许性能

log_dest none

# 选择日志的级别(可设置多项)

#log_type error

#log_type warning

#log_type notice

#log_type information

# 是否记录客户端连接信息

#connection_messages true

# 是否记录日志时间

#log_timestamp true

# =================================================================

# Security

# =================================================================

# 客户端ID的前缀限制,可用于保证安全性

#clientid_prefixes

# 允许匿名用户

#allow_anonymous true

# 用户/密码文件,默认格式:username:password

#password_file

# PSK格式密码文件,默认格式:identity:key

#psk_file

# pattern write sensor/%u/data

# ACL权限配置,常用语法如下:

# 用户限制:user <username>

# 话题限制:topic [read|write] <topic>

# 正则限制:pattern write sensor/%u/data

#acl_file

# =================================================================

# Bridges

# =================================================================

# 允许服务之间使用“桥接”模式(可用于分布式部署)

#connection <name>

#address <host>[:<port>]

#topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix]

# 设置桥接的客户端ID

#clientid

# 桥接断开时,是否清除远程服务器中的消息

#cleansession false

# 是否发布桥接的状态信息

#notifications true

# 设置桥接模式下,消息将会发布到的话题地址

# $SYS/broker/connection/<clientid>/state

#notification_topic

# 设置桥接的keepalive数值

#keepalive_interval 60

# 桥接模式,目前有三种:automatic、lazy、once

#start_type automatic

# 桥接模式automatic的超时时间

#restart_timeout 30

# 桥接模式lazy的超时时间

#idle_timeout 60

# 桥接客户端的用户名

#username

# 桥接客户端的密码

#password

# bridge_cafile:桥接客户端的CA证书文件

# bridge_capath:桥接客户端的CA证书目录

# bridge_certfile:桥接客户端的PEM证书文件

# bridge_keyfile:桥接客户端的PEM密钥文件

#bridge_cafile

#bridge_capath

#bridge_certfile

#bridge_keyfile

# 自己的配置可以放到以下目录中

include_dir /etc/mosquitto/conf.d

最后,启动Mosquitto服务很简单,直接运行命令行“mosquitto -c /etc/mosquitto/mosquitto.conf -d”即可开启服务。接下来,就让我们尽情体验Mosquitto的强大功能吧!

另外,Mosquitto是个异步IO框架,经测试可以轻松处理20000个以上的客户端连接。当然,实际的最大承载量还和业务的复杂度还有比较大的关系。下图是本人在一台普通Linux机器上进行的压力测试结果,大家可以参考。

友情提醒:测试的时候不要忘记调整系统的最大连接数和栈大小,比如:Linux上可用ulimit -n20000 -s512命令设置你需要的系统参数。

消息接收客户端准备

a> 下载&解压AndroidPushNotificationsDemo项目(下载地址:https://github.com/tokudu/AndroidPushNotificationsDemo

b> 将该项目导入Eclipse中(File -> Export -> Existing Projects into Workspace)

c> 修改PushService.java中的MQTT_HOST常量为推送服务端的IP地址。如果server端口不是默认的1883的话,端口“MQTT_BROKER_PORT_NUM”也记得要一并修改。

d> 启动Android模拟器,并安装该项目。

注意:在新版本的Android SDK中可能会遇到以下错误。

... ...

08-23 02:28:44.184: W/dalvikvm(282): VFY: unable to find class referenced in signature (Lcom/ibm/mqtt/MqttPersistence;)

08-23 02:28:44.194: I/dalvikvm(282): Failed resolving Lcom/tokudu/demo/PushService$MQTTConnection; interface 35 'Lcom/ibm/mqtt/MqttSimpleCallback;'

08-23 02:28:44.194: W/dalvikvm(282): Link of class 'Lcom/tokudu/demo/PushService$MQTTConnection;' failed

08-23 02:28:44.194: E/dalvikvm(282): Could not find class 'com.tokudu.demo.PushService$MQTTConnection', referenced from method com.tokudu.demo.PushService.connect

08-23 02:28:44.194: W/dalvikvm(282): VFY: unable to resolve new-instance 42 (Lcom/tokudu/demo/PushService$MQTTConnection;) in Lcom/tokudu/demo/PushService;

... ...

08-23 02:28:44.404: E/AndroidRuntime(282): java.lang.VerifyError: com.tokudu.demo.PushService

08-23 02:28:44.404: E/AndroidRuntime(282): at com.tokudu.demo.PushActivity$1.onClick(PushActivity.java:32)

08-23 02:28:44.404: E/AndroidRuntime(282): at android.view.View.performClick(View.java:2408)

08-23 02:28:44.404: E/AndroidRuntime(282): at android.view.View$PerformClick.run(View.java:8816)

08-23 02:28:44.404: E/AndroidRuntime(282): at android.os.Handler.handleCallback(Handler.java:587)

08-23 02:28:44.404: E/AndroidRuntime(282): at android.os.Handler.dispatchMessage(Handler.java:92)

08-23 02:28:44.404: E/AndroidRuntime(282): at android.os.Looper.loop(Looper.java:123)

08-23 02:28:44.404: E/AndroidRuntime(282): at android.app.ActivityThread.main(ActivityThread.java:4627)

08-23 02:28:44.404: E/AndroidRuntime(282): at java.lang.reflect.Method.invokeNative(Native Method)

08-23 02:28:44.404: E/AndroidRuntime(282): at java.lang.reflect.Method.invoke(Method.java:521)

08-23 02:28:44.404: E/AndroidRuntime(282): at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:868)

08-23 02:28:44.404: E/AndroidRuntime(282): at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:626)

08-23 02:28:44.404: E/AndroidRuntime(282): at dalvik.system.NativeStart.main(Native Method)

... ...

原因是发布的时候没有加入wmqtt.jar包,解决办法如下:

1> 在项目根目录下创建libs目录,并把wmqtt.jar包移入该目录。

2> 重新配置项目的Java Build Path(右键菜单中的Properties选项中)。

3> 重新打包发布即可。

运行效果如下:
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: