您的位置:首页 > 编程语言 > Delphi

delphi之IOCP学习(一)

2016-06-22 10:36 417 查看
困扰已久的网络通信(IOCP:完成端口),今天终于揭开她的神秘面纱了,之前百度N久还是未能理解IOCP,网络上好多博文都没有贴出源码,初学者很难正在理解IOCP并自己写出通信例子 ,经过努力,今天自己终于做出了简单的测试程序,下面贴出源码,水平有限,难免有错,希望不要误人子弟。

1、Svr主窗体

unit Umain;

interface

uses
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs, StdCtrls, UIOCPSvr;

type
TForm1 = class(TForm)
Button1: TButton;
mmoRev: TMemo;
procedure Button1Click(Sender: TObject);
private
IOCPSvr: TIOCPSvr;
{ Private declarations }
public
{ Public declarations }

end;

var
Form1: TForm1;

implementation

{$R *.dfm}

procedure TForm1.Button1Click(Sender: TObject);
begin
IOCPSvr := TIOCPSvr.Create(Self);
IOCPSvr.Host := '192.168.1.86';
IOCPSvr.Port := 8988;
IOCPSvr.open;
end;

end.


   2、IOCP 服务端实现代码

unit UIOCPSvr;

interface

uses
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs, StdCtrls, JwaWinsock2;

const
DATA_BUFSIZE = 1024;

type
LPVOID = Pointer;
{* 完成端口操作定义 *}
TIocpOperate = (ioNone, ioCon, ioRead, ioWrite, ioStream, ioExit);
PIocpRecord = ^TIocpRecord;
TIocpRecord = record
Overlapped: TOverlapped; //完成端口重叠结构
WsaBuf: TWsaBuf; //完成端口的缓冲区定义
IocpOperate: TIOCPOperate; //当前操作类型
end;

type
TThreadRev = class(TThread)
private
pData: Pointer;
protected
procedure Execute; override;
public
constructor Create(CreateSuspended: Boolean; adata: Pointer);
destructor Destroy; override;
end;

TThreadCon = class(TThread)
private
PSocket: TSocket;
lvIOPort: THandle;
protected
procedure Execute; override;
public
constructor Create(CreateSuspended: Boolean; var aSocket: TSocket; var aIOport: THandle);
destructor Destroy; override;
end;

TIOCPSvr = class(TComponent)
private
FHost: string;
FPort: Integer;
ThreadCon: TThreadCon;
ThreadRev: TThreadRev;
protected
public
constructor Create(AOwner: TComponent); override;
destructor Destroy; override;
procedure open;
published
property Port: Integer read FPort write FPort;
property Host: string read FHost write FHost;
end;

procedure SendData(astr: string; FSocket: TSocket); //发生数据
function PIocpAllocate(ALen: Cardinal): PIocpRecord;  //分配内存
procedure PIocpRelease(var AValue: PIocpRecord); //释放内存

implementation

uses Umain;

function PIocpAllocate(ALen: Cardinal): PIocpRecord;
begin
New(Result);
Result.Overlapped.Internal := 0;
Result.Overlapped.InternalHigh := 0;
Result.Overlapped.Offset := 0;
Result.Overlapped.OffsetHigh := 0;
Result.Overlapped.hEvent := 0;
Result.IocpOperate := ioNone;
Result.WsaBuf.buf := GetMemory(ALen);
Result.WsaBuf.len := ALen;
end;

procedure PIocpRelease(var AValue: PIocpRecord);
begin
FreeMemory(AValue.WsaBuf.buf);
AValue.WsaBuf.buf := nil;
Dispose(AValue);
end;

procedure SendData(astr: string; FSocket: TSocket);
var
IocpRec: PIocpRecord;
iErrCode: Integer;
dSend, dFlag: DWORD;
FOutputBuf: TMemoryStream;
begin

FOutputBuf := TMemoryStream.Create;
FOutputBuf.WriteBuffer(astr[1], Length(astr));

New(IocpRec);
IocpRec.Overlapped.Internal := 0;
IocpRec.Overlapped.InternalHigh := 0;
IocpRec.Overlapped.Offset := 0;
IocpRec.Overlapped.OffsetHigh := 0;
IocpRec.Overlapped.hEvent := 0;
IocpRec.WsaBuf.buf := GetMemory(Length(astr));
IocpRec.WsaBuf.len := Length(astr);

IocpRec.IocpOperate := ioWrite;
System.Move(PAnsiChar(FOutputBuf.Memory)[0], IocpRec.WsaBuf.buf^, FOutputBuf.Size);
dFlag := 0;
if WSASend(FSocket, @IocpRec.WsaBuf, 1, dSend, dFlag, @IocpRec.Overlapped, nil) = SOCKET_ERROR then
begin
iErrCode := WSAGetLastError;
if iErrCode <> ERROR_IO_PENDING then
begin
// FIocpServer.DoError('WSASend', GetLastWsaErrorStr);
//ProcessNetError(iErrCode);
end;
end;
FreeAndNil(FOutputBuf);
end;

{ TIOCPSvr }

constructor TIOCPSvr.Create(AOwner: TComponent);
begin
inherited;

end;

destructor TIOCPSvr.Destroy;
begin
ThreadCon.Terminate;
if ThreadCon.Suspended then
ThreadCon.Resume;

FreeAndNil(ThreadCon);
inherited;
end;

procedure TIOCPSvr.open;
var
WSData: TWSAData;
lvIOPort: THandle;
lvAddr: TSockAddr;
sSocket: TSocket;
begin

//加载初始化SOCKET。使用的是2.2版为了后面方便加入心跳。
WSAStartup($0202, WSData);

// 创建一个完成端口(内核对象),新建一个IOCP
lvIOPort := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);

//创建一个工作线程,调试用
ThreadRev := TThreadRev.Create(False, Pointer(lvIOPort));

//创建一个套接字,将此套接字和一个端口绑定并监听此端口。
sSocket := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED);
if sSocket = SOCKET_ERROR then
begin
closesocket(sSocket);
WSACleanup();
end;
lvAddr.sin_family := AF_INET;
lvAddr.sin_port := htons(Port);
lvAddr.sin_addr.s_addr := htonl(INADDR_ANY);
if bind(sSocket, @lvAddr, sizeof(lvAddr)) = SOCKET_ERROR then
begin
closesocket(sSocket);
end;
listen(sSocket, 20);

//连接线程,当有客户端请求建立连接在该现场中处理
ThreadCon := TThreadCon.Create(False, sSocket, lvIOPort);

//下面循环进行循环获取客户端的请求。这注释部分放到 ThreadCon线程中处理了
//  while (TRUE) do
//  begin
//     //当客户端有连接请求的时候,WSAAccept函数会新创建一个套接字cSocket。这个套接字就是和客户端通信的时候使用的套接字。
//    cSocket := WSAAccept(sSocket, nil, nil, nil, 0);
//
//     //判断cSocket套接字创建是否成功,如果不成功则退出。
//    if (cSocket = SOCKET_ERROR) then
//    begin
//      closesocket(sSocket);
//      exit;
//    end;
//
//     //将套接字、完成端口绑定在一起。
//    lvPerIOPort := CreateIoCompletionPort(cSocket, lvIOPort, cSocket, 0);
//    if (lvPerIOPort = 0) then
//    begin
//      Exit;
//    end;
//
//     //初始化数据包
//    PerIoData := PIocpAllocate(DATA_BUFSIZE);
//    PerIoData.IocpOperate := ioCon;
//     //通知工作线程,有新的套接字连接<第三个参数>
//    PostQueuedCompletionStatus(lvIOPort, 0, cSocket, POverlapped(PerIOData));
//  end;

end;

{ TThreadCon }

constructor TThreadCon.Create(CreateSuspended: Boolean; var aSocket: TSocket; var aIOport: THandle);
begin
inherited create(CreateSuspended);
PSocket := aSocket;
lvIOPort := aIOport;
end;

destructor TThreadCon.Destroy;
begin

inherited;
end;

procedure TThreadCon.Execute;
var
cSocket: TSocket;
lvPerIOPort: Integer;
PerIoData: PIocpRecord;
begin
inherited;
while not Terminated do
begin

//当客户端有连接请求的时候,WSAAccept函数会新创建一个套接字cSocket。这个套接字就是和客户端通信的时候使用的套接字。
cSocket := WSAAccept(PSocket, nil, nil, nil, 0);

//判断cSocket套接字创建是否成功,如果不成功则退出。
if (cSocket = SOCKET_ERROR) then
begin
closesocket(PSocket);
exit;
end;

//将套接字、完成端口绑定在一起。
lvPerIOPort := CreateIoCompletionPort(cSocket, lvIOPort, cSocket, 0);
if (lvPerIOPort = 0) then
begin
Exit;
end;

//初始化数据包
PerIoData := PIocpAllocate(DATA_BUFSIZE);
PerIoData.IocpOperate := ioCon;
//通知工作线程,有新的套接字连接<第三个参数>
PostQueuedCompletionStatus(lvIOPort, 0, cSocket, POverlapped(PerIOData));
end;

end;

{ TThreadRev }

constructor TThreadRev.Create(CreateSuspended: Boolean; adata: Pointer);
begin
inherited Create(CreateSuspended);
pData := adata;
end;

destructor TThreadRev.Destroy;
begin

inherited;
end;

procedure TThreadRev.Execute;
var
CompletionPort: THANDLE;
BytesTransferred: Cardinal;
PerIoData: PIocpRecord;
cSocket: TSocket;
Flags: Cardinal;
lvResultStatus: BOOL;
temp: string;
begin
inherited;
CompletionPort := THandle(pData);

//得到创建线程是传递过来的IOCP
while not Terminated do
begin
//工作者线程会停止到GetQueuedCompletionStatus函数处,直到接受到数据为止
lvResultStatus := GetQueuedCompletionStatus(CompletionPort, BytesTransferred, cSocket, POverlapped(PerIoData), INFINITE);

{//CompletionPort:新建IOCP CreateIoCompletionPort()函数返回的端口    // BytesTransferred 收到数据的长度
// cSocket 个人理解就是通信sock句柄   //PerIoData 数据结构
//INFINITE 超时时间,这里是一直等待的意思,GetQueuedCompletionStatus 可以参考百度百科}

if (lvResultStatus = False) then
begin
//当客户端连接断开或者客户端调用closesocket函数的时候,函数GetQueuedCompletionStatus会返回错误。如果我们加入心跳后,在这里就可以来判断套接字是否依然在连接。
if cSocket <> 0 then
begin
closesocket(cSocket);
end;
if PerIoData <> nil then
begin
PIocpRelease(PerIoData);
end;
continue;
end;

if PerIoData = nil then
begin
closesocket(cSocket);
Break;
end
else if (PerIoData <> nil) then
begin

if PerIoData.IocpOperate = ioCon then //连接请求
begin

PIocpRelease(PerIoData);
end
else if PerIoData.IocpOperate = ioRead then
begin
////可以在这里处理数据……
temp:= Copy(string(PerIoData.WsaBuf.buf),1,BytesTransferred); //获取接收到的数据 这里只处理了字符串
Form1.mmoRev.Lines.Add(format('收到客户端:%d 消息:%s',[cSocket,temp]));
// temp := 'hello world !' + #13#10;  //indy TCP 需要#13#10 才能收到信息
SendData(temp, cSocket); //接受什么数据原样返回
PIocpRelease(PerIoData);//释放内存
end;
Flags := 0;
/////进入投递收取动作
PerIoData := PIocpAllocate(DATA_BUFSIZE);
PerIoData.IocpOperate := ioRead;

/////异步收取数据
WSARecv(cSocket, @PerIoData.WsaBuf, 1, PerIoData.WsaBuf.len, Flags, @PerIoData.Overlapped, nil);
if (WSAGetLastError() <> ERROR_IO_PENDING) then
begin
closesocket(cSocket);
if PerIoData <> nil then
begin
PIocpRelease(PerIoData);
end;
Continue;
end;
end;
end;

end;

end.


3、indy TCP 客户端

unit Unit1;

interface

uses
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs, IdTCPServer, IdBaseComponent, IdComponent, IdTCPConnection,
IdTCPClient, StdCtrls, Sockets;

type
TForm1 = class(TForm)
IdTCPClient1: TIdTCPClient;
btnCon: TButton;
mmo1: TMemo;
btnSend: TButton;
btnRev: TButton;
edtSend: TEdit;
edtHost: TEdit;
edtPort: TEdit;
procedure IdTCPClient1Connected(Sender: TObject);
procedure btnConClick(Sender: TObject);
procedure btnSendClick(Sender: TObject);
procedure btnRevClick(Sender: TObject);
private
{ Private declarations }
public
{ Public declarations }
end;

var
Form1: TForm1;

implementation

{$R *.dfm}

procedure TForm1.IdTCPClient1Connected(Sender: TObject);
begin
mmo1.Lines.Add('用户连接上');
end;

procedure TForm1.btnConClick(Sender: TObject);
begin

IdTCPClient1.Host:=edtHost.Text;
IdTCPClient1.Port:=StrToInt(edtPort.Text) ;
IdTCPClient1.Connect();
btnCon.Enabled:=False;
btnSend.Enabled:=True;
end;

procedure TForm1.btnSendClick(Sender: TObject);
begin
IdTCPClient1.WriteLn(edtSend.Text);
btnSend.Enabled:=False;
btnRev.Enabled:=True;
end;

procedure TForm1.btnRevClick(Sender: TObject);
begin
mmo1.Lines.Add( IdTCPClient1.ReadLn(#13#10,-1,-1));
btnRev.Enabled:=False;
btnSend.Enabled:=True;
end;

end.


源码下载地址:

CSDN下载地址:http://download.csdn.net/detail/marszzx/9556196

欢迎大家一起学习,共同进步 。QQ :359985051
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: