DELPHI高性能大容量SOCKET并发(四):粘包、分包、解包
2012-09-05 22:51
736 查看
粘包
使用TCP长连接就会引入粘包的问题,粘包是指发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。粘包可能由发送方造成,也可能由接收方造成。TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一包数据,造成多个数据包的粘连。如果接收进程不及时接收数据,已收到的数据就放在系统接收缓冲区,用户进程读取数据时就可能同时读到多个数据包。
粘包一般的解决办法是制定通讯协议,由协议来规定如何分包解包。
分包
在IOCPDemo例子程序中,我们分包的逻辑是先发一个长度,然后紧接着是数据包内容,这样就可以把每个包分开。
应用层数据包格式如下:
IOCPSocket分包处理主要代码,我们收到的数据都是在TSocketHandle.ProcessIOComplete方法中处理:
解包
由于我们应用层数据包既可以传命令也可以传数据,因而针对每个包我们进行解包,分出命令和数据分别处理,因而每个Socket服务对象都需要解包,我们解包的逻辑是放在TBaseSocket.DecodePacket中,命令和数据的包格式为:
这里和第一版公布的代码不同,这版的代码对命令进行了编码,采用UTF-8编码,代码如下:
更详细代码见示例代码的IOCPSocket单元。
V1版下载地址:http://download.csdn.net/detail/sqldebug_fan/4510076,需要资源10分,有稳定性问题,可以作为研究稳定性用;
V2版下载地址:http://download.csdn.net/detail/sqldebug_fan/5560185,不需要资源分,解决了稳定性问题和提高性能;免责声明:此代码只是为了演示IOCP编程,仅用于学习和研究,切勿用于商业用途。水平有限,错误在所难免,欢迎指正和指导。邮箱地址:fansheng_hx@163.com。
使用TCP长连接就会引入粘包的问题,粘包是指发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。粘包可能由发送方造成,也可能由接收方造成。TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一包数据,造成多个数据包的粘连。如果接收进程不及时接收数据,已收到的数据就放在系统接收缓冲区,用户进程读取数据时就可能同时读到多个数据包。
粘包一般的解决办法是制定通讯协议,由协议来规定如何分包解包。
分包
在IOCPDemo例子程序中,我们分包的逻辑是先发一个长度,然后紧接着是数据包内容,这样就可以把每个包分开。
应用层数据包格式如下:
应用层数据包格式 | |
数据包长度Len:Cardinal(4字节无符号整数) | 数据包内容,长度为Len |
procedure TSocketHandle.ProcessIOComplete(AIocpRecord: PIocpRecord; const ACount: Cardinal); begin case AIocpRecord.IocpOperate of ioNone: Exit; ioRead: //收到数据 begin FActiveTime := Now; ReceiveData(AIocpRecord.WsaBuf.buf, ACount); if FConnected then PreRecv(AIocpRecord); //投递请求 end; ioWrite: //发送数据完成,需要释放AIocpRecord的指针 begin FActiveTime := Now; FSendOverlapped.Release(AIocpRecord); end; ioStream: begin FActiveTime := Now; FSendOverlapped.Release(AIocpRecord); WriteStream; //继续发送流 end; end; end;如果是收到数据,则调用ReceiveData函数,ReceiveData主要功能是把数据的写入流中,然后调用Process分包。FInputBuf是一个内存流(FInputBuf: TMemoryStream),内存流的每次写入会造成一次内存分配,如果要获得更高的效率,可以替换为内存池等更好的内存管理方式。还有一种更好的解决方案是规定每次发包的大小,如每个包最大不超过64K,哪么缓冲区的最大大小可以设置为128K(缓存两个数据包),这样就可以每次创建对象时一次分配好,减少内存分配次数,提高效率。(内存的分配和释放比内存的读写效率要低)
procedure TSocketHandle.ReceiveData(AData: PAnsiChar; const ALen: Cardinal); begin FInputBuf.Write(AData^, ALen); Process; end;Process则根据收到的数据进行分包逻辑,如果不够一个包,则继续等待接收数据,如果够一个或多个包,则循环调用Execute函数进行处理,代码如下:
procedure TSocketHandle.Process; var AData, ALast, NewBuf: PByte; iLenOffset, iOffset, iReserveLen: Integer; function ReadLen: Integer; var wLen: Word; cLen: Cardinal; begin FInputBuf.Position := iOffset; if FLenType = ltWord then begin FInputBuf.Read(wLen, SizeOf(wLen)); //wLen := ntohs(wLen); Result := wLen; end else begin FInputBuf.Read(cLen, SizeOf(cLen)); //cLen := ntohl(cLen); Result := cLen; end; end; begin case FLenType of ltWord, ltCardinal: begin if FLenType = ltWord then iLenOffset := 2 else iLenOffset := 4; iReserveLen := 0; FPacketLen := 0; iOffset := 0; if FPacketLen <= 0 then begin if FInputBuf.Size < iLenOffset then Exit; FInputBuf.Position := 0; //移动到最前面 FPacketLen := ReadLen; iOffset := iLenOffset; iReserveLen := FInputBuf.Size - iOffset; if FPacketLen > iReserveLen then //不够一个包的长度 begin FInputBuf.Position := FInputBuf.Size; //移动到最后,以便接收后续数据 FPacketLen := 0; Exit; end; end; while (FPacketLen > 0) and (iReserveLen >= FPacketLen) do //如果数据够长,则处理 begin //多个包循环处理 AData := Pointer(Longint(FInputBuf.Memory) + iOffset); //取得当前的指针 Execute(AData, FPacketLen); iOffset := iOffset + FPacketLen; //移到下一个点 FPacketLen := 0; iReserveLen := FInputBuf.Size - iOffset; if iReserveLen > iLenOffset then //剩下的数据 begin FPacketLen := ReadLen; iOffset := iOffset + iLenOffset; iReserveLen := FInputBuf.Size - iOffset; if FPacketLen > iReserveLen then //不够一个包的长度,需要把长度回退 begin iOffset := iOffset - iLenOffset; iReserveLen := FInputBuf.Size - iOffset; FPacketLen := 0; end; end else //不够长度字节数 FPacketLen := 0; end; if iReserveLen > 0 then //把剩下的自己缓存起来 begin ALast := Pointer(Longint(FInputBuf.Memory) + iOffset); GetMem(NewBuf, iReserveLen); try CopyMemory(NewBuf, ALast, iReserveLen); FInputBuf.Clear; FInputBuf.Write(NewBuf^, iReserveLen); finally FreeMemory(NewBuf); end; end else begin FInputBuf.Clear; end; end; else begin FInputBuf.Position := 0; AData := Pointer(Longint(FInputBuf.Memory)); //取得当前的指针 Execute(AData, FInputBuf.Size); FInputBuf.Clear; end; end; end;
解包
由于我们应用层数据包既可以传命令也可以传数据,因而针对每个包我们进行解包,分出命令和数据分别处理,因而每个Socket服务对象都需要解包,我们解包的逻辑是放在TBaseSocket.DecodePacket中,命令和数据的包格式为:
命令长度Len:Cardinal(4字节无符号整数) | 命令 | 数据 |
function TBaseSocket.DecodePacket(APacketData: PByte; const ALen: Integer): Boolean; var CommandLen: Integer; UTF8Command: UTF8String; begin if ALen > 4 then //命令长度为4字节,因而长度必须大于4 begin CopyMemory(@CommandLen, APacketData, SizeOf(Cardinal)); //获取命令长度 Inc(APacketData, SizeOf(Cardinal)); SetLength(UTF8Command, CommandLen); CopyMemory(PUTF8String(UTF8Command), APacketData, CommandLen); //读取命令 Inc(APacketData, CommandLen); FRequestData := APacketData; //数据 FRequestDataLen := ALen - SizeOf(Cardinal) - CommandLen; //数据长度 FRequest.Text := Utf8ToAnsi(UTF8Command); //把UTF8转为Ansi Result := True; end else Result := False; end;具体每个协议可以集成Execute方法,调用DecodePacket进行解包,然后根据命令进行协议逻辑处理,例如TSQLSocket主要代码如下:
{* SQL查询SOCKET基类 *} TSQLSocket = class(TBaseSocket) private {* 开始事务创建TADOConnection,关闭事务时释放 *} FBeginTrans: Boolean; FADOConn: TADOConnection; protected {* 处理数据接口 *} procedure Execute(AData: PByte; const ALen: Cardinal); override; {* 返回SQL语句执行结果 *} procedure DoCmdSQLOpen; {* 执行SQL语句 *} procedure DoCmdSQLExec; {* 开始事务 *} procedure DoCmdBeginTrans; {* 提交事务 *} procedure DoCmdCommitTrans; {* 回滚事务 *} procedure DoCmdRollbackTrans; public procedure DoCreate; override; destructor Destroy; override; {* 获取SQL语句 *} function GetSQL: string; property BeginTrans: Boolean read FBeginTrans; end;Exceute是调用DecodePacket进行解包,然后获取命令分别调用不同的命令处理逻辑,代码如下:
procedure TSQLSocket.Execute(AData: PByte; const ALen: Cardinal); var sErr: string; begin inherited; FRequest.Clear; FResponse.Clear; try AddResponseHeader; if ALen = 0 then begin DoFailure(CIPackLenError); DoSendResult; Exit; end; if DecodePacket(AData, ALen) then begin FResponse.Clear; AddResponseHeader; case StrToSQLCommand(Command) of scLogin: begin DoCmdLogin; DoSendResult; end; scActive: begin DoSuccess; DoSendResult; end; scSQLOpen: begin DoCmdSQLOpen; end; scSQLExec: begin DoCmdSQLExec; DoSendResult; end; scBeginTrans: begin DoCmdBeginTrans; DoSendResult; end; scCommitTrans: begin DoCmdCommitTrans; DoSendResult; end; scRollbackTrans: begin DoCmdRollbackTrans; DoSendResult; end; else DoFailure(CINoExistCommand, 'Unknow Command'); DoSendResult; end; end else begin DoFailure(CIPackFormatError, 'Packet Must Include \r\n\r\n'); DoSendResult; end; except on E: Exception do //发生未知错误,断开连接 begin sErr := RemoteAddress + ':' + IntToStr(RemotePort) + CSComma + 'Unknow Error: ' + E.Message; WriteLogMsg(ltError, sErr); Disconnect; end; end; end;
更详细代码见示例代码的IOCPSocket单元。
V1版下载地址:http://download.csdn.net/detail/sqldebug_fan/4510076,需要资源10分,有稳定性问题,可以作为研究稳定性用;
V2版下载地址:http://download.csdn.net/detail/sqldebug_fan/5560185,不需要资源分,解决了稳定性问题和提高性能;免责声明:此代码只是为了演示IOCP编程,仅用于学习和研究,切勿用于商业用途。水平有限,错误在所难免,欢迎指正和指导。邮箱地址:fansheng_hx@163.com。
相关文章推荐
- DELPHI高性能大容量SOCKET并发(四):粘包、分包、解包
- DELPHI高性能大容量SOCKET并发(四):粘包、分包、解包
- DELPHI高性能大容量SOCKET并发(四):粘包、分包、解包
- C#高性能大容量SOCKET并发(五):粘包、分包、解包
- 高性能大容量SOCKET并发(四):粘包、分包、解包
- C#高性能大容量SOCKET并发(五):粘包、分包、解包
- C#高性能大容量SOCKET并发(五):粘包、分包、解包
- C#高性能大容量SOCKET并发(五):粘包、分包、解包
- C#高性能大容量SOCKET并发(五):粘包、分包、解包
- DELPHI高性能大容量SOCKET并发(三):接收、发送、缓存
- DELPHI高性能大容量SOCKET并发(九):稳定性问题解决
- DELPHI高性能大容量SOCKET并发(一):IOCP完成端口例子介绍
- DELPHI高性能大容量SOCKET并发(十):IOCP完成端口性能优化
- DELPHI高性能大容量SOCKET并发(五):锁和对象分离
- DELPHI高性能大容量SOCKET并发(二):IOCP完成端口控件封装
- DELPHI高性能大容量SOCKET并发(六):协议字符集
- DELPHI高性能大容量SOCKET并发(八):断点续传(上传也可以续传)
- DELPHI高性能大容量SOCKET并发(七):通讯协议
- DELPHI高性能大容量SOCKET并发(九):稳定性问题解决
- DELPHI高性能大容量SOCKET并发:IOCP完成端口例子介绍