MQTT协议详解二
2016-12-25 14:26
197 查看
转载地址:http://www.xuebuyuan.com/1951013.html
下面我们开始一步步执行连接,订阅,发布和接收。
首先就是CONNECT,发送连接申请。
在连接中我们要做的是将客户端ID、消息标识、用户名、密码等能过Socket传送给服务器。
以上是变量的申明,但不一定全部都用上,比如用户名和密码就可选。
这个是发起连接的函数,安全是按照协议来组装代码,但是这是用PHP实现的时候要注意一下,因为PHP是弱类型的,不能像C里直接计算,否则会将非INT型的当做0,INT型的直接计算,所以要注意使用chr()和ord(),pack()和unpacka()这两对函数,在字符拼接时,一定要用chr()或pack()把ASCII转成Char型的,而在计算移位时,又要把其转换为ASCII。
二、对于整个推送的架构应该如下图所示:
上面是当Qos为1的推送,即对于推送的内容会保存至服务器中,然后返回PUBACK告知操作成功,然后便会向在线终端推送。当设置Retain值为1时,但终端上线,则会立即推送。
我们现在就按Qos来理解推送的三种机制(Qos仅在PUBLISH消息中有效)
当Qos为0时,是最基本的推送形式,它最大的特点就是无保障性的,因为当你将数据包发送给服务器时,服务器不会有任何的回应,故不知成功与否
但如果成功,刚会发送给相应的订阅者,而且不会保存任何的发送信息,发完就完全删除了,所以成功率为小于等于1。
当Qos为1时,则是有保障性的传输,当发送推送数据给服务器时,其会返回一个PUBACK的确认包给发送者,用于确定成功与否,当把Retain值设为1时,会保存这个状态,对于所有后来上线的都会直接推送最后一次推的内容。所以说至少会推送成功一次。
当Qos为2时,因为是我们最常用的了,就要通过协议的机制既要保证推送的成功性,也要保证推送的准确性。
下面给出其示例图:
可以说要保证推送的成功传送,其认证的过程是比较比杂的,首先在发送的时候将Qos标识为2,然后发送PUBLISH信息,服务器收到后会将信息保存然后发送PUBREC告知
已接收,但此时服务器并不会推送,而是等待下一个客户端的PUBREL
下面贴出代码:
三、订阅
接下去说说订阅的,订阅者总体来说比发送者处理上要简单,仅仅是四个很容易理解的消息,SUBSCRIBE,SUBACK,UNSUBSCRIBE和UNSUBACK。
分别是对订阅和取消的判断。
下面直接贴代码:
取消订阅和订阅的结构和方法其实是一样的
下面贴出取消订阅的代码:
最后就是PINGREQ消息,即在每个Keep Alive Time中发送Ping请求,来判断主机是否仍然在线。
做为回应则是PINGRESP消息,表示目前仍然在线。
下面贴出Ping请求和回应代码:
以上代码是各部分的实现,而要实际使用则要配合异步和事件机制,虽然PHP一直以来在线程方面比较弱,但是以现在PHP所支持的来看,已完全有能力实现异步和事件机制。
这也是我下一篇博文要介绍的,同时还会对MQTT协议有针对性的修改和补充,使之更好的结合数据库以及满足较高并发量的实现。
最后,能力有限,人艰不拆,欢迎指教。
下面我们开始一步步执行连接,订阅,发布和接收。
首先就是CONNECT,发送连接申请。
在连接中我们要做的是将客户端ID、消息标识、用户名、密码等能过Socket传送给服务器。
private $socket; private $msgid=1;//消息id public $keepalive=10;//默认心跳时间 public $timeincepting;//主机时间,有来断开连接 public $topics=array();//订阅主题 public $debug=false; public $host; //主机名 public $port;//端口名 public $clientid; //客户机id public $will;//是否保存客户机will标识的数组,包括Qos,Retain,Dup。 public $username; //保存用户名 public $password; //保存密码 public $operations=array( "MQTT_CONNECT"=>1,//请求连接 "MQTT_CONNACK"=>2,//请求应答 "MQTT_PUBLISH"=>3,//发布消息 "MQTT_PUBACK"=>4,//发布应答 "MQTT_PUBREC"=>5,//发布已接收,保证传递1 "MQTT_PUBREL"=>6,//发布释放,保证传递2 "MQTT_PUBCOMP"=>7,//发布完成,保证传递3 "MQTT_SUBSCRIBE"=>8,//订阅请求 "MQTT_SUBACK"=>9,//订阅应答 "MQTT_UNSUBSCRIBE"=>10,//取消订阅 "MQTT_UNSUBACK"=>11,//取消订阅应答 "MQTT_PINGREQ"=>12,//ping请求 "MQTT_PINGRESP"=>13,//ping响应 "MQTT_DISCONNECT"=>14//断开连接 );
以上是变量的申明,但不一定全部都用上,比如用户名和密码就可选。
function connect($clean=1,$will=NULL,$userarray=NULL){ $this->username=$userarray['username']; $this->password=$userarray['password']; $this->socket=fsockopen($this->host,$this->port,$errno,$errstr,60); if(!$this->socket){ $this->debug("socket_error $errno,$errstr <br/>"); return false; } stream_set_timeout($this->socket, 10);//用于读取流时的时间控制 // stream_set_blocking($this->socket, 0);//0表示非阻塞,1表示阻塞 $index=0; $payload=$this->create_connect_payload($this->clientid,$will,$userarray); $index+=$payload['index']; $vhead=$this->create_variable_connect_header($userarray,$will,$clean); $index+=$vhead['index']; $conhead=$this->create_fixed_header('MQTT_CONNECT',0,0,0); //发起连接 $index=$this->numencoding($index); //因为对于超过每个字节是8位,超过就要多加一位。 $conhead.=$index; fwrite($this->socket,$conhead,strlen($conhead));//传递固定头部长度2字节 fwrite($this->socket,$vhead['content'].$payload['content']);//传递变长头部和消息体 $resCode =$this->read_fixed_header($this->socket); //读取返回的头部 $resRemaing=$this->read_remaing_length($this->socket); if($resRemaing>0){ $body=unpack('Ccomp/Cretcode', fread($this->socket,$resRemaing)); if($resCode['msgtype'] == $this->operations['MQTT_CONNACK'] && $body['retcode'] ==0){ //0表示连接成功,大于1-5表示错误。 $this->debug("Connected to {$this->host}:{$this->port}<br/>"); }else{ $this->debug(sprintf("Connection failed! (Error: 0x%02x)", $body['retcode']));//读取错误,其错误代码见协议 } $this->timesinceping = time(); return true; } return false; }
这个是发起连接的函数,安全是按照协议来组装代码,但是这是用PHP实现的时候要注意一下,因为PHP是弱类型的,不能像C里直接计算,否则会将非INT型的当做0,INT型的直接计算,所以要注意使用chr()和ord(),pack()和unpacka()这两对函数,在字符拼接时,一定要用chr()或pack()把ASCII转成Char型的,而在计算移位时,又要把其转换为ASCII。
二、对于整个推送的架构应该如下图所示:
上面是当Qos为1的推送,即对于推送的内容会保存至服务器中,然后返回PUBACK告知操作成功,然后便会向在线终端推送。当设置Retain值为1时,但终端上线,则会立即推送。
我们现在就按Qos来理解推送的三种机制(Qos仅在PUBLISH消息中有效)
当Qos为0时,是最基本的推送形式,它最大的特点就是无保障性的,因为当你将数据包发送给服务器时,服务器不会有任何的回应,故不知成功与否
但如果成功,刚会发送给相应的订阅者,而且不会保存任何的发送信息,发完就完全删除了,所以成功率为小于等于1。
当Qos为1时,则是有保障性的传输,当发送推送数据给服务器时,其会返回一个PUBACK的确认包给发送者,用于确定成功与否,当把Retain值设为1时,会保存这个状态,对于所有后来上线的都会直接推送最后一次推的内容。所以说至少会推送成功一次。
当Qos为2时,因为是我们最常用的了,就要通过协议的机制既要保证推送的成功性,也要保证推送的准确性。
下面给出其示例图:
可以说要保证推送的成功传送,其认证的过程是比较比杂的,首先在发送的时候将Qos标识为2,然后发送PUBLISH信息,服务器收到后会将信息保存然后发送PUBREC告知
已接收,但此时服务器并不会推送,而是等待下一个客户端的PUBREL
下面贴出代码:
public function publish($topic,$message,$qos=2,$retain=0){ $pubhead=$this->create_fixed_header('MQTT_PUBLISH',1,$qos,$retain);//生成固定头部标识为PUBLISH,其消息结构也比较简单。 $index=0; $body=""; $body.=$this->strencoding($topic,$index); if($qos>0){ //如果Qos大于0,即要保证传输则要加上MessageID,自己定义 $body.=$this->create_message_id(++$this->msgid,$index); } $body.=$message; $index+=strlen($message); $pubhead.=$this->numencoding($index); //同样的要对消息体的长度按照消息规定进行设置 fwrite($this->socket,$pubhead,strlen($pubhead)); fwrite($this->socket,$body,$index); $resCode =$this->read_fixed_header($this->socket); $resRemaing=$this->read_remaing_length($this->socket); //接收返回信息 if($resRemaing>0){ if($resCode['msgtype']==$this->operations['MQTT_PUBACK']){ //如果Qos为1的话,那么返回的是PUBACK $logedId=$this->read_message_id($this->socket); if($logedId==$this->msgid){ $this->debug(" Qos 1 Message ".$logedId."send successfully<br/>"); } }else if($resCode['msgtype']==$this->operations['MQTT_PUBREC']){ //如果Qos为2的话,那么返回信息就是PUBREC,此时要发送同意推送信息PUBREL $this->debug("received message PUBREC"); $logedId=$this->read_message_id($this->socket); if($logedId==$this->msgid){ $this->debug("Qos 2 Message ".$this->msgid."send successfully<br/>"); $relHead=$this->create_fixed_header('MQTT_PUBREL',0,0,0); $index=0; $payload=$this->create_message_id($logedId,$index); $relHead.=$this->numencoding($index); fwrite($this->socket, $relHead,strlen($relHead)); fwrite($this->socket,$payload,strlen($payload)); $resCode=$this->read_fixed_header($this->socket); $resRemaing=$this->read_remaing_length($this->socket); if($resRemaing>0){ if($resCode['msgtype']==$this->operations['MQTT_PUBCOMP']){ //最后收到的是PUBCOMP信息,表示推送结束。 $clogedId=$this->read_message_id($this->socket); if($clogedId==$logedId){ $this->debug("the whole flow send successfully ,then the server can delete the message!<br/>"); }else{ } } } } } } }
三、订阅
接下去说说订阅的,订阅者总体来说比发送者处理上要简单,仅仅是四个很容易理解的消息,SUBSCRIBE,SUBACK,UNSUBSCRIBE和UNSUBACK。
分别是对订阅和取消的判断。
下面直接贴代码:
public function subscribe($topics,$qos=0,$dup=0){ $subHead=$this->create_fixed_header("MQTT_SUBSCRIBE",$dup,$qos,0);//在订阅中Retain标识是元效的 $index=0; $body=$this->create_message_id($this->msgid,$index); foreach($topics as $key=>$topic){ $body.=$this->strencoding($topic['topic'],$index); //这里补充一下,因为此支持一次订阅多个主题,所以topics是一个二维数组。第二维表示主题和qos。 $body.=chr($topic['qos']); $index++; } $subHead.=$this->numencoding($index); //老样子,第二个字节是下面字符的总长。 fwrite($this->socket,$subHead,strlen($subHead)); fwrite($this->socket,$body); $res=$this->read_fixed_header($this->socket); //获取返回值 $remaing=$this->read_remaing_length($this->socket); if($remaing>0){ if($res['msgtype']==$this->operations['MQTT_SUBACK']){ //解析SUBACK包的消息 $this->debug("get suback<br/>"); $logedId=$this->read_message_id($this->socket); if($logedId==$this->msgid){ $this->debug("subscribe successful<br/>"); $qosGrantCode=fread($this->socket,count($topics[0])); //在SUBACK的消息中我们可以确认我们订阅各个主题的Qos。 $this->debug(var_dump(unpack("C*",$qosGrantCode))); } } } }
取消订阅和订阅的结构和方法其实是一样的
下面贴出取消订阅的代码:
public function unsubscribe($untopics,$dup=0,$qos=1){ $unsHead=$this->create_fixed_header("MQTT_UNSUBSCRIBE",$dup,$qos,0); $index=0; $body=$this->create_message_id($this->msgid,$index); //变长头为MessageID foreach($untopics as $untopic){ $body.=$this->strencoding($untopic,$index); //这里的untopics为一维数组,保存退订的主题。 } $unsHead.=$this->numencoding($index); fwrite($this->socket,$unsHead,strlen($unsHead)); fwrite($this->socket,$body); $res=$this->read_fixed_header($this->socket); //读取头部 $remaing=$this->read_remaing_length($this->socket); if($remaing>0){ if($res['msgtype']==$this->operations['MQTT_UNSUBACK']){ //处理UNSUBACK信息。 $unlogedId=$this->read_message_id($this->socket); if($unlogedId==$this->msgid){ //匹配发送前后ID是否相同 $this->debug("unsubscribe successful"); } } } }
最后就是PINGREQ消息,即在每个Keep Alive Time中发送Ping请求,来判断主机是否仍然在线。
做为回应则是PINGRESP消息,表示目前仍然在线。
下面贴出Ping请求和回应代码:
public function pingreq(){ $reqHead=$this->create_fixed_header("MQTT_PINGREQ",0,0,0);//发送PINT REQUEST,其它标识位无意义,全为0 $reqHead.=chr(0); //只有固定头部,固其只有2个字节的消息长 fwrite($this->socket,$reqHead,strlen($reqHead)); $res=$this->read_fixed_header($this->socket); if($res!=NULL&&$res['msgtype']==$this->operations['MQTT_PINGRESP']){ //如果返回了PING RESPONSE则服务器仍在线 $this->debug("Yes I am alive!"); }else{ $this->debug("Server is outline!"); } }
以上代码是各部分的实现,而要实际使用则要配合异步和事件机制,虽然PHP一直以来在线程方面比较弱,但是以现在PHP所支持的来看,已完全有能力实现异步和事件机制。
这也是我下一篇博文要介绍的,同时还会对MQTT协议有针对性的修改和补充,使之更好的结合数据库以及满足较高并发量的实现。
最后,能力有限,人艰不拆,欢迎指教。
相关文章推荐
- MQTT协议详解,非常易懂
- MQTT协议详解二
- MQTT协议详解一
- MQTT协议详解一
- MQTT协议详解一
- MQTT协议详解一
- MQTT协议详解
- MQTT-协议详解(来源网络)
- ASP 中使用 HTTP 协议发送参数详解
- 《TCP/IP 详解 卷1:协议》的读书体会
- 路由器原理和路由协议、算法详解(3)
- 网络编辑基础:对HTTP协议的头信息详解
- Memcache的使用和协议分析详解
- Google SiteMap的作用及协议格式详解
- 网络编辑基础:对HTTP协议的头信息详解
- 路由器原理和路由协议、算法详解(4)
- SSH协议详解
- 网络编程基础:对HTTP协议的头信息详解
- PCP协议实现详解
- 实时传输协议详解