Windows Socket I/O模型之 I/O完成端口
2011-02-02 14:23
330 查看
#
include
<
winsock2.
h>
#
include
<
ws2tcpip.
h>
#
include
<
mswsock.
h>
#
include
<
windows.
h>
#
include
"resolve.h"
#
include
"public.h"
#
define
DEFAULT_OVERLAPPED_COUNT 5
#
define
MAX_COMPLETION_THREAD_COUNT 32 // Maximum number of completion threads allowed
typedef
SINGLE_LIST_HEADER SocketObjHeader;
typedef
SINGLE_LIST SocketObj;
typedef
DOUBLE_LIST_HEADER BufferObjHeader;
typedef
DOUBLE_LIST BufferObj;
int
gOverlappedCount =
DEFAULT_OVERLAPPED_COUNT;
typedef
struct
_BUFFER_OBJ_EX {
WSAOVERLAPPED ol;
// Overlapped structure
SOCKET
sclient;
// Used for AcceptEx client socket
char
*
buf;
// Buffer for send/recv/AcceptEx
int
buflen;
// Length of the buffer
int
operation;
// Type of operation submitted
#
define
OP_ACCEPT 0 // AcceptEx
#
define
OP_READ 1 // WSARecv/WSARecvFrom
#
define
OP_WRITE 2 // WSASend?WSASendTo
SOCKADDR_STORAGE addr;
// Remote address (UDP)
int
addrlen;
// Remote address length
ULONG IoOrder;
// Order in which this I/O was posted
BufferObj bufDList;
}
BUFFER_OBJ_EX;
typedef
struct
_SOCKET_OBJ
{
SOCKET
s;
// Socket handle for client connection
int
af,
// Address family of socket (AF_INET or AF_INET6)
bClosing;
// Indicates socket is closing
volatile
LONG
OutstandingOps;
// Number of outstanding overlapped ops
BUFFER_OBJ_EX *
*
PendingAccepts;
// Array of pending AcceptEx calls (listening socket only)
ULONG LastSendIssued,
// Last sequence number sent
IoCountIssued;
// Next sequence number assigned to receives
BufferObjHeader OutOfOrderSends;
// List of send buffers that completed out of order
// Pointers to Microsoft specific extensions (listening socket only)
LPFN_ACCEPTEX lpfnAcceptEx;
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs;
CRITICAL_SECTION SockCritSec;
// Synchronize access to this SOCKET_OBJ
SocketObj next;
// Used to chain SOCKET_OBJ together
}
SOCKET_OBJ;
SOCKET_OBJ *
GetSocketObj(
SOCKET
s,
int
af)
{
SOCKET_OBJ *
sockobj=
NULL
;
sockobj =
(
SOCKET_OBJ *
)
HeapAlloc(
GetProcessHeap(
)
,
HEAP_ZERO_MEMORY,
sizeof
(
SOCKET_OBJ)
)
;
if
(
sockobj =
=
NULL
)
{
fprintf
(
stderr
,
"GetSocketObj: HeapAlloc failed: %d/n"
,
GetLastError(
)
)
;
ExitProcess(
-
1)
;
}
InitializeCriticalSection(
&
sockobj-
>
SockCritSec)
;
InitializeDoubleHead(
&
sockobj-
>
OutOfOrderSends)
;
sockobj-
>
IoCountIssued =
(
(
gProtocol =
=
IPPROTO_TCP
)
?
1 :
0)
;
// Initialize the members
sockobj-
>
s =
s;
sockobj-
>
af =
af;
return
sockobj;
}
BUFFER_OBJ_EX *
GetBufferObjEx(
int
buflen)
{
BUFFER_OBJ_EX *
newobj=
NULL
;
// Allocate the object
newobj =
(
BUFFER_OBJ_EX *
)
HeapAlloc(
GetProcessHeap(
)
,
HEAP_ZERO_MEMORY,
sizeof
(
BUFFER_OBJ_EX)
)
;
if
(
newobj =
=
NULL
)
{
printf
(
"GetBufferObj: HeapAlloc failed: %d/n"
,
GetLastError(
)
)
;
ExitProcess(
-
1)
;
}
// Allocate the buffer
newobj-
>
buf =
(
char
*
)
HeapAlloc(
GetProcessHeap(
)
,
HEAP_ZERO_MEMORY,
sizeof
(
BYTE)
*
buflen)
;
if
(
newobj-
>
buf =
=
NULL
)
{
printf
(
"GetBufferObj: HeapAlloc failed: %d/n"
,
GetLastError(
)
)
;
ExitProcess(
-
1)
;
}
newobj-
>
buflen =
buflen;
newobj-
>
addrlen =
sizeof
(
newobj-
>
addr)
;
return
newobj;
}
void
FreeBufferObjEx(
BUFFER_OBJ_EX *
obj)
{
HeapFree(
GetProcessHeap(
)
,
0,
obj-
>
buf)
;
HeapFree(
GetProcessHeap(
)
,
0,
obj)
;
}
void
FreeSocketObj(
SOCKET_OBJ *
obj)
{
BUFFER_OBJ_EX *
ptr=
NULL
,
*
tmp=
NULL
;
if
(
obj-
>
OutstandingOps !
=
0)
{
// Still outstanding operations so just return
return
;
}
// Close the socket if it hasn't already been closed
if
(
obj-
>
s !
=
INVALID_SOCKET)
{
closesocket(
obj-
>
s)
;
obj-
>
s =
INVALID_SOCKET;
}
DeleteCriticalSection(
&
obj-
>
SockCritSec)
;
HeapFree(
GetProcessHeap(
)
,
0,
obj)
;
}
void
InsertPendingSend(
SOCKET_OBJ *
sock,
BUFFER_OBJ_EX *
send
)
{
BufferObj *
ptr =
NULL
;
BUFFER_OBJ_EX *
obj =
NULL
;
EnterCriticalSection(
&
sock-
>
SockCritSec)
;
ptr =
(
BufferObj *
)
GotoNextDoubleList(
&
sock-
>
OutOfOrderSends,
&
sock-
>
OutOfOrderSends.
head)
;
while
(
ptr)
{
obj =
(
BUFFER_OBJ_EX *
)
container_of(
BUFFER_OBJ_EX,
bufDList,
ptr)
;
if
(
send
-
>
IoOrder <
obj-
>
IoOrder)
{
break
;
}
ptr =
(
BufferObj *
)
GotoNextDoubleList(
&
sock-
>
OutOfOrderSends,
ptr)
;
}
if
(
ptr =
=
NULL
)
ptr =
&
sock-
>
OutOfOrderSends.
head;
EnqueueDoubleListBefore(
&
sock-
>
OutOfOrderSends,
ptr,
&
send
-
>
bufDList)
;
LeaveCriticalSection(
&
sock-
>
SockCritSec)
;
}
int
PostRecv(
SOCKET_OBJ *
sock,
BUFFER_OBJ_EX *
recvobj)
{
WSABUF wbuf;
DWORD bytes,
flags;
int
rc;
recvobj-
>
operation =
OP_READ;
wbuf.
buf =
recvobj-
>
buf;
wbuf.
len =
recvobj-
>
buflen;
flags =
0;
EnterCriticalSection(
&
sock-
>
SockCritSec)
;
// Assign the IO order to this receive. This must be performned within
// the critical section. The operation of assigning the IO count and posting
// the receive cannot be interupted.
recvobj-
>
IoOrder =
sock-
>
IoCountIssued;
sock-
>
IoCountIssued+
+
;
if
(
gProtocol =
=
IPPROTO_TCP
)
{
rc =
WSARecv(
sock-
>
s,
&
wbuf,
1,
&
bytes,
&
flags,
&
recvobj-
>
ol,
NULL
)
;
}
else
{
ExitProcess(
-
1)
;
}
LeaveCriticalSection(
&
sock-
>
SockCritSec)
;
if
(
rc =
=
SOCKET_ERROR)
{
if
(
WSAGetLastError(
)
!
=
WSA_IO_PENDING)
{
fprintf
(
stderr
,
"PostRecv: WSARecv* failed: %d/n"
,
WSAGetLastError(
)
)
;
return
SOCKET_ERROR;
}
}
// Increment outstanding overlapped operations
InterlockedIncrement(
&
sock-
>
OutstandingOps)
;
return
NO_ERROR;
}
int
sendnum =
0;
int
PostSend(
SOCKET_OBJ *
sock,
BUFFER_OBJ_EX *
sendobj)
{
WSABUF wbuf;
DWORD bytes;
int
rc;
sendobj-
>
operation =
OP_WRITE;
wbuf.
buf =
sendobj-
>
buf;
wbuf.
len =
sendobj-
>
buflen;
EnterCriticalSection(
&
sock-
>
SockCritSec)
;
// Incrmenting the last send issued and issuing the send should not be
// interuptable.
sock-
>
LastSendIssued+
+
;
if
(
gProtocol =
=
IPPROTO_TCP
)
{
printf
(
"send %d/n"
,
sendnum+
+
)
;
rc =
WSASend(
sock-
>
s,
&
wbuf,
1,
&
bytes,
0,
&
sendobj-
>
ol,
NULL
)
;
}
else
{
ExitProcess(
-
1)
;
}
LeaveCriticalSection(
&
sock-
>
SockCritSec)
;
if
(
rc =
=
SOCKET_ERROR)
{
if
(
WSAGetLastError(
)
!
=
WSA_IO_PENDING)
{
fprintf
(
stderr
,
"PostSend: WSASend* failed: %d/n"
,
WSAGetLastError(
)
)
;
return
SOCKET_ERROR;
}
}
// Increment the outstanding operation count
InterlockedIncrement(
&
sock-
>
OutstandingOps)
;
return
NO_ERROR;
}
int
PostAccept(
SOCKET_OBJ *
sock,
BUFFER_OBJ_EX *
acceptobj)
{
DWORD bytes;
int
rc;
acceptobj-
>
operation =
OP_ACCEPT;
// Create the client socket for an incoming connection
acceptobj-
>
sclient =
socket
(
sock-
>
af,
SOCK_STREAM
,
IPPROTO_TCP
)
;
if
(
acceptobj-
>
sclient =
=
INVALID_SOCKET)
{
fprintf
(
stderr
,
"PostAccept: socket failed: %d/n"
,
WSAGetLastError(
)
)
;
return
-
1;
}
rc =
sock-
>
lpfnAcceptEx(
sock-
>
s,
acceptobj-
>
sclient,
acceptobj-
>
buf,
acceptobj-
>
buflen -
(
(
sizeof
(
SOCKADDR_STORAGE)
+
16)
*
2)
,
sizeof
(
SOCKADDR_STORAGE)
+
16,
sizeof
(
SOCKADDR_STORAGE)
+
16,
&
bytes,
&
acceptobj-
>
ol
)
;
if
(
rc =
=
FALSE
)
{
if
(
WSAGetLastError(
)
!
=
WSA_IO_PENDING)
{
fprintf
(
stderr
,
"PostAccept: AcceptEx failed: %d/n"
,
WSAGetLastError(
)
)
;
return
SOCKET_ERROR;
}
}
// Increment the outstanding overlapped count for this socket
InterlockedIncrement(
&
sock-
>
OutstandingOps)
;
return
NO_ERROR;
}
int
DoSends(
SOCKET_OBJ *
sock)
{
BufferObj *
ptr =
NULL
;
BufferObj *
tmp =
NULL
;
BUFFER_OBJ_EX *
sendobj =
NULL
;
int
ret;
ret =
NO_ERROR;
EnterCriticalSection(
&
sock-
>
SockCritSec)
;
ptr =
(
BufferObj *
)
GotoNextDoubleList(
&
sock-
>
OutOfOrderSends,
&
sock-
>
OutOfOrderSends.
head)
;
while
(
ptr)
{
sendobj =
(
BUFFER_OBJ_EX *
)
container_of(
BUFFER_OBJ_EX,
bufDList,
ptr)
;
if
(
(
sendobj)
&
&
(
sendobj-
>
IoOrder =
=
sock-
>
LastSendIssued)
)
{
if
(
PostSend(
sock,
sendobj)
!
=
NO_ERROR)
{
FreeBufferObjEx(
sendobj)
;
ret =
SOCKET_ERROR;
break
;
}
}
else
break
;
tmp =
ptr;
ptr =
(
BufferObj *
)
GotoNextDoubleList(
&
sock-
>
OutOfOrderSends,
ptr)
;
RemoveDoubleList(
&
sock-
>
OutOfOrderSends,
tmp)
;
//FreeBufferObjEx(sendobj);
}
LeaveCriticalSection(
&
sock-
>
SockCritSec)
;
return
ret;
}
int
handleNum =
0;
int
handlecut =
0;
void
HandleIo(
SOCKET_OBJ *
sock,
BUFFER_OBJ_EX *
buf,
HANDLE CompPort,
DWORD BytesTransfered,
DWORD error
)
{
SOCKET_OBJ *
clientobj=
NULL
;
// New client object for accepted connections
BUFFER_OBJ_EX *
recvobj=
NULL
,
// Used to post new receives on accepted connections
*
sendobj=
NULL
;
// Used to post new sends for data received
BOOL
bCleanupSocket;
char
*
tmp;
int
i;
bCleanupSocket =
FALSE
;
if
(
(
error
!
=
NO_ERROR)
&
&
(
gProtocol =
=
IPPROTO_TCP
)
)
{
FreeBufferObjEx(
buf)
;
if
(
InterlockedDecrement(
&
sock-
>
OutstandingOps)
=
=
0)
{
FreeSocketObj(
sock)
;
}
return
;
}
EnterCriticalSection(
&
sock-
>
SockCritSec)
;
if
(
buf-
>
operation =
=
OP_ACCEPT)
{
HANDLE hrc;
SOCKADDR_STORAGE *
LocalSockaddr=
NULL
,
*
RemoteSockaddr=
NULL
;
int
LocalSockaddrLen,
RemoteSockaddrLen;
sock-
>
lpfnGetAcceptExSockaddrs(
buf-
>
buf,
buf-
>
buflen -
(
(
sizeof
(
SOCKADDR_STORAGE)
+
16)
*
2)
,
sizeof
(
SOCKADDR_STORAGE)
+
16,
sizeof
(
SOCKADDR_STORAGE)
+
16,
(
SOCKADDR
*
*
)
&
LocalSockaddr,
&
LocalSockaddrLen,
(
SOCKADDR
*
*
)
&
RemoteSockaddr,
&
RemoteSockaddrLen)
;
clientobj =
GetSocketObj(
buf-
>
sclient,
sock-
>
af)
;
hrc =
CreateIoCompletionPort(
(
HANDLE)
buf-
>
sclient,
CompPort,
(
ULONG_PTR)
clientobj,
0)
;
if
(
hrc =
=
NULL
)
{
fprintf
(
stderr
,
"CompletionThread: CreateIoCompletionPort failed: %d/n"
,
GetLastError(
)
)
;
return
;
}
sendobj =
GetBufferObjEx(
BytesTransfered)
;
memcpy
(
sendobj-
>
buf,
buf-
>
buf,
BytesTransfered)
;
if
(
PostSend(
clientobj,
sendobj)
=
=
NO_ERROR)
{
// Now post some receives on this new connection
for
(
i =
0;
i <
gOverlappedCount;
i+
+
)
{
recvobj =
GetBufferObjEx(
gBufferSize)
;
if
(
PostRecv(
clientobj,
recvobj)
!
=
NO_ERROR)
{
FreeBufferObjEx(
recvobj)
;
error
=
SOCKET_ERROR;
break
;
}
}
}
else
{
FreeBufferObjEx(
sendobj)
;
error
=
SOCKET_ERROR;
}
PostAccept(
sock,
buf)
;
if
(
error
!
=
NO_ERROR)
{
if
(
clientobj-
>
OutstandingOps =
=
0)
{
closesocket(
clientobj-
>
s)
;
clientobj-
>
s =
INVALID_SOCKET;
FreeSocketObj(
clientobj)
;
}
else
{
clientobj-
>
bClosing =
TRUE
;
}
error
=
NO_ERROR;
}
}
else
if
(
(
buf-
>
operation =
=
OP_READ)
&
&
(
error
=
=
NO_ERROR)
)
{
if
(
(
BytesTransfered >
0)
)
{
printf
(
"HandleIo: %d/n"
,
handleNum+
+
)
;
// Create a buffer to send
sendobj =
GetBufferObjEx(
gBufferSize)
;
printf
(
"handlecut %d./n"
,
handlecut+
+
)
;
// Swap the buffers (i.e. buffer we just received becomes the send buffer)
tmp =
sendobj-
>
buf;
sendobj-
>
buflen =
BytesTransfered;
sendobj-
>
buf =
buf-
>
buf;
sendobj-
>
IoOrder =
buf-
>
IoOrder;
buf-
>
buf =
tmp;
buf-
>
buflen =
gBufferSize;
InsertPendingSend(
sock,
sendobj)
;
if
(
DoSends(
sock)
!
=
NO_ERROR)
{
error
=
SOCKET_ERROR;
printf
(
"SOCKET_ERROR./n"
)
;
}
else
{
// Post another receive
if
(
PostRecv(
sock,
buf)
!
=
NO_ERROR)
{
// In the event the recv fails, clean up the connection
FreeBufferObjEx(
buf)
;
error
=
SOCKET_ERROR;
printf
(
"SOCKET_ERROR./n"
)
;
}
}
}
else
{
sock-
>
bClosing =
TRUE
;
// Free the receive buffer
FreeBufferObjEx(
buf)
;
if
(
DoSends(
sock)
!
=
NO_ERROR)
{
error
=
SOCKET_ERROR;
printf
(
"SOCKET_ERROR./n"
)
;
}
// If this was the last outstanding operation on socket, clean it up
if
(
(
sock-
>
OutstandingOps =
=
0)
&
&
(
sock-
>
OutOfOrderSends.
count
=
=
0)
)
{
bCleanupSocket =
TRUE
;
}
}
}
else
if
(
buf-
>
operation =
=
OP_WRITE)
{
FreeBufferObjEx(
buf)
;
if
(
DoSends(
sock)
!
=
NO_ERROR)
{
error
=
SOCKET_ERROR;
}
}
if
(
error
!
=
NO_ERROR)
{
sock-
>
bClosing =
TRUE
;
}
if
(
(
InterlockedDecrement(
&
sock-
>
OutstandingOps)
=
=
0)
&
&
(
sock-
>
bClosing)
&
&
(
sock-
>
OutOfOrderSends.
count
=
=
0)
)
{
bCleanupSocket =
TRUE
;
}
else
{
if
(
DoSends(
sock)
!
=
NO_ERROR)
{
bCleanupSocket =
TRUE
;
}
}
LeaveCriticalSection(
&
sock-
>
SockCritSec)
;
if
(
bCleanupSocket)
{
closesocket(
sock-
>
s)
;
sock-
>
s =
INVALID_SOCKET;
FreeSocketObj(
sock)
;
}
}
int
iocpNum =
0;
DWORD WINAPI CompletionThread(
LPVOID lpParam)
{
SOCKET_OBJ *
sockobj=
NULL
;
// Per socket object for completed I/O
BUFFER_OBJ_EX *
bufobj=
NULL
;
// Per I/O object for completed I/O
OVERLAPPED *
lpOverlapped=
NULL
;
// Pointer to overlapped structure for completed I/O
HANDLE CompletionPort;
// Completion port handle
DWORD BytesTransfered,
// Number of bytes transfered
Flags;
// Flags for completed I/O
int
rc,
error
;
CompletionPort =
(
HANDLE)
lpParam;
while
(
true
)
{
error
=
NO_ERROR;
rc =
GetQueuedCompletionStatus(
CompletionPort,
&
BytesTransfered,
(
PULONG_PTR)
&
sockobj,
&
lpOverlapped,
INFINITE)
;
printf
(
"iocp %d./n"
,
iocpNum+
+
)
;
bufobj =
CONTAINING_RECORD(
lpOverlapped,
BUFFER_OBJ_EX,
ol)
;
if
(
rc =
=
FALSE
)
{
printf
(
"false./n"
)
;
rc =
WSAGetOverlappedResult(
sockobj-
>
s,
&
bufobj-
>
ol,
&
BytesTransfered,
FALSE
,
&
Flags)
;
if
(
rc =
=
FALSE
)
{
error
=
WSAGetLastError(
)
;
printf
(
"WSAGetOverlappedResult failed %d./n"
,
error
)
;
}
}
HandleIo(
sockobj,
bufobj,
CompletionPort,
BytesTransfered,
error
)
;
}
ExitProcess(
0)
;
return
0;
}
int
_tmain(
int
argc,
_TCHAR*
argv[
]
)
{
WSADATA wsd;
SYSTEM_INFO sysinfo;
SOCKET_OBJ *
sockobj=
NULL
;
SocketObjHeader ListenSockets;
HANDLE CompletionPort,
CompThreads[
MAX_COMPLETION_THREAD_COUNT]
,
hrc;
int
endpointcount=
0,
interval,
rc,
i;
struct
addrinfo
*
res=
NULL
,
*
ptr=
NULL
;
if
(
WSAStartup(
MAKEWORD(
2,
2)
,
&
wsd)
!
=
0)
{
fprintf
(
stderr
,
"unable to load Winsock!/n"
)
;
return
-
1;
}
CompletionPort =
CreateIoCompletionPort(
INVALID_HANDLE_VALUE,
NULL
,
NULL
,
0)
;
if
(
CompletionPort =
=
NULL
)
{
fprintf
(
stderr
,
"CreateIoCompletionPort failed./n"
)
;
return
0;
}
GetSystemInfo(
&
sysinfo)
;
if
(
sysinfo.
dwNumberOfProcessors >
MAX_COMPLETION_THREAD_COUNT)
{
sysinfo.
dwNumberOfProcessors =
MAX_COMPLETION_THREAD_COUNT;
}
//for (i = 0; i < sysinfo.dwNumberOfProcessors; ++i) {
for
(
i =
0;
i <
1;
+
+
i)
{
CompThreads[
i]
=
CreateThread(
NULL
,
0,
CompletionThread,
(
LPVOID)
CompletionPort,
0,
NULL
)
;
if
(
CompThreads[
i]
=
=
NULL
)
{
fprintf
(
stderr
,
"CreateThread failed./n"
)
;
return
-
1;
}
}
res =
ResolveAddress(
gSrvAddr,
gPort,
gAddressFamily,
gSocketType,
gProtocol)
;
if
(
res =
=
NULL
)
{
fprintf
(
stderr
,
"ResolveAddress failed to return any addresses!/n"
)
;
return
-
1;
}
InitializeSingleHead(
&
ListenSockets)
;
ptr =
res;
while
(
ptr)
{
sockobj =
GetSocketObj(
INVALID_SOCKET,
ptr-
>
ai_family)
;
sockobj-
>
s =
socket
(
ptr-
>
ai_family,
ptr-
>
ai_socktype,
ptr-
>
ai_protocol)
;
if
(
sockobj-
>
s =
=
INVALID_SOCKET)
{
fprintf
(
stderr
,
"socket failed: %d/n"
,
WSAGetLastError(
)
)
;
return
-
1;
}
hrc =
CreateIoCompletionPort(
(
HANDLE)
sockobj-
>
s,
CompletionPort,
(
ULONG_PTR)
sockobj,
0)
;
if
(
hrc =
=
NULL
)
{
fprintf
(
stderr
,
"CreateIoCompletionPort failed: %d/n"
,
GetLastError(
)
)
;
return
-
1;
}
rc =
bind
(
sockobj-
>
s,
ptr-
>
ai_addr,
ptr-
>
ai_addrlen)
;
if
(
rc =
=
SOCKET_ERROR)
{
fprintf
(
stderr
,
"bind failed: %d/n"
,
WSAGetLastError(
)
)
;
return
-
1;
}
if
(
gProtocol =
=
IPPROTO_TCP
)
{
BUFFER_OBJ_EX *
acceptobj=
NULL
;
GUID guidAcceptEx =
WSAID_ACCEPTEX,
guidGetAcceptExSockaddrs =
WSAID_GETACCEPTEXSOCKADDRS;
DWORD bytes;
// Need to load the Winsock extension functions from each provider
// -- e.g. AF_INET and AF_INET6.
rc =
WSAIoctl(
sockobj-
>
s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&
guidAcceptEx,
sizeof
(
guidAcceptEx)
,
&
sockobj-
>
lpfnAcceptEx,
sizeof
(
sockobj-
>
lpfnAcceptEx)
,
&
bytes,
NULL
,
NULL
)
;
if
(
rc =
=
SOCKET_ERROR)
{
fprintf
(
stderr
,
"WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER failed: %d/n"
,
WSAGetLastError(
)
)
;
return
-
1;
}
rc =
WSAIoctl(
sockobj-
>
s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&
guidGetAcceptExSockaddrs,
sizeof
(
guidGetAcceptExSockaddrs)
,
&
sockobj-
>
lpfnGetAcceptExSockaddrs,
sizeof
(
sockobj-
>
lpfnGetAcceptExSockaddrs)
,
&
bytes,
NULL
,
NULL
)
;
if
(
rc =
=
SOCKET_ERROR)
{
fprintf
(
stderr
,
"WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER faled: %d/n"
,
WSAGetLastError(
)
)
;
return
-
1;
}
rc =
listen
(
sockobj-
>
s,
100)
;
if
(
rc =
=
SOCKET_ERROR)
{
fprintf
(
stderr
,
"listen failed: %d/n"
,
WSAGetLastError(
)
)
;
return
-
1;
}
sockobj-
>
PendingAccepts =
(
BUFFER_OBJ_EX *
*
)
HeapAlloc(
GetProcessHeap(
)
,
HEAP_ZERO_MEMORY,
(
sizeof
(
BUFFER_OBJ_EX *
)
*
gOverlappedCount)
)
;
if
(
sockobj-
>
PendingAccepts =
=
NULL
)
{
fprintf
(
stderr
,
"HeapAlloc failed: %d/n"
,
GetLastError(
)
)
;
ExitProcess(
-
1)
;
}
for
(
i=
0;
i <
gOverlappedCount ;
i+
+
)
{
sockobj-
>
PendingAccepts[
i]
=
acceptobj =
GetBufferObjEx(
gBufferSize)
;
PostAccept(
sockobj,
acceptobj)
;
}
EnqueueSingleList(
&
ListenSockets,
&
(
sockobj-
>
next)
)
;
}
ptr =
ptr-
>
ai_next;
}
freeaddrinfo
(
res)
;
interval =
0;
while
(
true
)
{
rc =
WSAWaitForMultipleEvents(
1,
CompThreads,
TRUE
,
5000,
FALSE
)
;
if
(
rc =
=
WAIT_FAILED)
{
fprintf
(
stderr
,
"WSAWaitForMultipleEvents failed: %d/n"
,
WSAGetLastError(
)
)
;
break
;
}
else
if
(
rc =
=
WAIT_TIMEOUT)
{
interval+
+
;
if
(
interval =
=
12)
{
SocketObj *
listenptr=
NULL
;
int
optval,
optlen;
// Walk the list of outstanding accepts
listenptr =
(
SocketObj *
)
GotoNextSingleList(
&
ListenSockets,
ListenSockets.
head)
;
while
(
listenptr)
{
sockobj =
(
SOCKET_OBJ *
)
container_of(
SOCKET_OBJ,
next,
listenptr)
;
for
(
i=
0;
i <
gOverlappedCount ;
i+
+
)
{
optlen =
sizeof
(
optval)
;
rc =
getsockopt
(
sockobj-
>
PendingAccepts[
i]
-
>
sclient,
SOL_SOCKET,
SO_CONNECT_TIME,
(
char
*
)
&
optval,
&
optlen
)
;
if
(
rc =
=
SOCKET_ERROR)
{
fprintf
(
stderr
,
"getsockopt: SO_CONNECT_TIME failed: %d/n"
,
WSAGetLastError(
)
)
;
return
-
1;
}
if
(
(
optval !
=
0xFFFFFFFF)
&
&
(
optval >
300)
)
{
closesocket(
sockobj-
>
PendingAccepts[
i]
-
>
sclient)
;
}
}
listenptr =
(
SocketObj *
)
GotoNextSingleList(
&
ListenSockets,
listenptr)
;
}
interval =
0;
}
}
}
WSACleanup(
)
;
return
0;
}
版权声明:
原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律责任。
include
<
winsock2.
h>
#
include
<
ws2tcpip.
h>
#
include
<
mswsock.
h>
#
include
<
windows.
h>
#
include
"resolve.h"
#
include
"public.h"
#
define
DEFAULT_OVERLAPPED_COUNT 5
#
define
MAX_COMPLETION_THREAD_COUNT 32 // Maximum number of completion threads allowed
typedef
SINGLE_LIST_HEADER SocketObjHeader;
typedef
SINGLE_LIST SocketObj;
typedef
DOUBLE_LIST_HEADER BufferObjHeader;
typedef
DOUBLE_LIST BufferObj;
int
gOverlappedCount =
DEFAULT_OVERLAPPED_COUNT;
typedef
struct
_BUFFER_OBJ_EX {
WSAOVERLAPPED ol;
// Overlapped structure
SOCKET
sclient;
// Used for AcceptEx client socket
char
*
buf;
// Buffer for send/recv/AcceptEx
int
buflen;
// Length of the buffer
int
operation;
// Type of operation submitted
#
define
OP_ACCEPT 0 // AcceptEx
#
define
OP_READ 1 // WSARecv/WSARecvFrom
#
define
OP_WRITE 2 // WSASend?WSASendTo
SOCKADDR_STORAGE addr;
// Remote address (UDP)
int
addrlen;
// Remote address length
ULONG IoOrder;
// Order in which this I/O was posted
BufferObj bufDList;
}
BUFFER_OBJ_EX;
typedef
struct
_SOCKET_OBJ
{
SOCKET
s;
// Socket handle for client connection
int
af,
// Address family of socket (AF_INET or AF_INET6)
bClosing;
// Indicates socket is closing
volatile
LONG
OutstandingOps;
// Number of outstanding overlapped ops
BUFFER_OBJ_EX *
*
PendingAccepts;
// Array of pending AcceptEx calls (listening socket only)
ULONG LastSendIssued,
// Last sequence number sent
IoCountIssued;
// Next sequence number assigned to receives
BufferObjHeader OutOfOrderSends;
// List of send buffers that completed out of order
// Pointers to Microsoft specific extensions (listening socket only)
LPFN_ACCEPTEX lpfnAcceptEx;
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs;
CRITICAL_SECTION SockCritSec;
// Synchronize access to this SOCKET_OBJ
SocketObj next;
// Used to chain SOCKET_OBJ together
}
SOCKET_OBJ;
SOCKET_OBJ *
GetSocketObj(
SOCKET
s,
int
af)
{
SOCKET_OBJ *
sockobj=
NULL
;
sockobj =
(
SOCKET_OBJ *
)
HeapAlloc(
GetProcessHeap(
)
,
HEAP_ZERO_MEMORY,
sizeof
(
SOCKET_OBJ)
)
;
if
(
sockobj =
=
NULL
)
{
fprintf
(
stderr
,
"GetSocketObj: HeapAlloc failed: %d/n"
,
GetLastError(
)
)
;
ExitProcess(
-
1)
;
}
InitializeCriticalSection(
&
sockobj-
>
SockCritSec)
;
InitializeDoubleHead(
&
sockobj-
>
OutOfOrderSends)
;
sockobj-
>
IoCountIssued =
(
(
gProtocol =
=
IPPROTO_TCP
)
?
1 :
0)
;
// Initialize the members
sockobj-
>
s =
s;
sockobj-
>
af =
af;
return
sockobj;
}
BUFFER_OBJ_EX *
GetBufferObjEx(
int
buflen)
{
BUFFER_OBJ_EX *
newobj=
NULL
;
// Allocate the object
newobj =
(
BUFFER_OBJ_EX *
)
HeapAlloc(
GetProcessHeap(
)
,
HEAP_ZERO_MEMORY,
sizeof
(
BUFFER_OBJ_EX)
)
;
if
(
newobj =
=
NULL
)
{
printf
(
"GetBufferObj: HeapAlloc failed: %d/n"
,
GetLastError(
)
)
;
ExitProcess(
-
1)
;
}
// Allocate the buffer
newobj-
>
buf =
(
char
*
)
HeapAlloc(
GetProcessHeap(
)
,
HEAP_ZERO_MEMORY,
sizeof
(
BYTE)
*
buflen)
;
if
(
newobj-
>
buf =
=
NULL
)
{
printf
(
"GetBufferObj: HeapAlloc failed: %d/n"
,
GetLastError(
)
)
;
ExitProcess(
-
1)
;
}
newobj-
>
buflen =
buflen;
newobj-
>
addrlen =
sizeof
(
newobj-
>
addr)
;
return
newobj;
}
void
FreeBufferObjEx(
BUFFER_OBJ_EX *
obj)
{
HeapFree(
GetProcessHeap(
)
,
0,
obj-
>
buf)
;
HeapFree(
GetProcessHeap(
)
,
0,
obj)
;
}
void
FreeSocketObj(
SOCKET_OBJ *
obj)
{
BUFFER_OBJ_EX *
ptr=
NULL
,
*
tmp=
NULL
;
if
(
obj-
>
OutstandingOps !
=
0)
{
// Still outstanding operations so just return
return
;
}
// Close the socket if it hasn't already been closed
if
(
obj-
>
s !
=
INVALID_SOCKET)
{
closesocket(
obj-
>
s)
;
obj-
>
s =
INVALID_SOCKET;
}
DeleteCriticalSection(
&
obj-
>
SockCritSec)
;
HeapFree(
GetProcessHeap(
)
,
0,
obj)
;
}
void
InsertPendingSend(
SOCKET_OBJ *
sock,
BUFFER_OBJ_EX *
send
)
{
BufferObj *
ptr =
NULL
;
BUFFER_OBJ_EX *
obj =
NULL
;
EnterCriticalSection(
&
sock-
>
SockCritSec)
;
ptr =
(
BufferObj *
)
GotoNextDoubleList(
&
sock-
>
OutOfOrderSends,
&
sock-
>
OutOfOrderSends.
head)
;
while
(
ptr)
{
obj =
(
BUFFER_OBJ_EX *
)
container_of(
BUFFER_OBJ_EX,
bufDList,
ptr)
;
if
(
send
-
>
IoOrder <
obj-
>
IoOrder)
{
break
;
}
ptr =
(
BufferObj *
)
GotoNextDoubleList(
&
sock-
>
OutOfOrderSends,
ptr)
;
}
if
(
ptr =
=
NULL
)
ptr =
&
sock-
>
OutOfOrderSends.
head;
EnqueueDoubleListBefore(
&
sock-
>
OutOfOrderSends,
ptr,
&
send
-
>
bufDList)
;
LeaveCriticalSection(
&
sock-
>
SockCritSec)
;
}
int
PostRecv(
SOCKET_OBJ *
sock,
BUFFER_OBJ_EX *
recvobj)
{
WSABUF wbuf;
DWORD bytes,
flags;
int
rc;
recvobj-
>
operation =
OP_READ;
wbuf.
buf =
recvobj-
>
buf;
wbuf.
len =
recvobj-
>
buflen;
flags =
0;
EnterCriticalSection(
&
sock-
>
SockCritSec)
;
// Assign the IO order to this receive. This must be performned within
// the critical section. The operation of assigning the IO count and posting
// the receive cannot be interupted.
recvobj-
>
IoOrder =
sock-
>
IoCountIssued;
sock-
>
IoCountIssued+
+
;
if
(
gProtocol =
=
IPPROTO_TCP
)
{
rc =
WSARecv(
sock-
>
s,
&
wbuf,
1,
&
bytes,
&
flags,
&
recvobj-
>
ol,
NULL
)
;
}
else
{
ExitProcess(
-
1)
;
}
LeaveCriticalSection(
&
sock-
>
SockCritSec)
;
if
(
rc =
=
SOCKET_ERROR)
{
if
(
WSAGetLastError(
)
!
=
WSA_IO_PENDING)
{
fprintf
(
stderr
,
"PostRecv: WSARecv* failed: %d/n"
,
WSAGetLastError(
)
)
;
return
SOCKET_ERROR;
}
}
// Increment outstanding overlapped operations
InterlockedIncrement(
&
sock-
>
OutstandingOps)
;
return
NO_ERROR;
}
int
sendnum =
0;
int
PostSend(
SOCKET_OBJ *
sock,
BUFFER_OBJ_EX *
sendobj)
{
WSABUF wbuf;
DWORD bytes;
int
rc;
sendobj-
>
operation =
OP_WRITE;
wbuf.
buf =
sendobj-
>
buf;
wbuf.
len =
sendobj-
>
buflen;
EnterCriticalSection(
&
sock-
>
SockCritSec)
;
// Incrmenting the last send issued and issuing the send should not be
// interuptable.
sock-
>
LastSendIssued+
+
;
if
(
gProtocol =
=
IPPROTO_TCP
)
{
printf
(
"send %d/n"
,
sendnum+
+
)
;
rc =
WSASend(
sock-
>
s,
&
wbuf,
1,
&
bytes,
0,
&
sendobj-
>
ol,
NULL
)
;
}
else
{
ExitProcess(
-
1)
;
}
LeaveCriticalSection(
&
sock-
>
SockCritSec)
;
if
(
rc =
=
SOCKET_ERROR)
{
if
(
WSAGetLastError(
)
!
=
WSA_IO_PENDING)
{
fprintf
(
stderr
,
"PostSend: WSASend* failed: %d/n"
,
WSAGetLastError(
)
)
;
return
SOCKET_ERROR;
}
}
// Increment the outstanding operation count
InterlockedIncrement(
&
sock-
>
OutstandingOps)
;
return
NO_ERROR;
}
int
PostAccept(
SOCKET_OBJ *
sock,
BUFFER_OBJ_EX *
acceptobj)
{
DWORD bytes;
int
rc;
acceptobj-
>
operation =
OP_ACCEPT;
// Create the client socket for an incoming connection
acceptobj-
>
sclient =
socket
(
sock-
>
af,
SOCK_STREAM
,
IPPROTO_TCP
)
;
if
(
acceptobj-
>
sclient =
=
INVALID_SOCKET)
{
fprintf
(
stderr
,
"PostAccept: socket failed: %d/n"
,
WSAGetLastError(
)
)
;
return
-
1;
}
rc =
sock-
>
lpfnAcceptEx(
sock-
>
s,
acceptobj-
>
sclient,
acceptobj-
>
buf,
acceptobj-
>
buflen -
(
(
sizeof
(
SOCKADDR_STORAGE)
+
16)
*
2)
,
sizeof
(
SOCKADDR_STORAGE)
+
16,
sizeof
(
SOCKADDR_STORAGE)
+
16,
&
bytes,
&
acceptobj-
>
ol
)
;
if
(
rc =
=
FALSE
)
{
if
(
WSAGetLastError(
)
!
=
WSA_IO_PENDING)
{
fprintf
(
stderr
,
"PostAccept: AcceptEx failed: %d/n"
,
WSAGetLastError(
)
)
;
return
SOCKET_ERROR;
}
}
// Increment the outstanding overlapped count for this socket
InterlockedIncrement(
&
sock-
>
OutstandingOps)
;
return
NO_ERROR;
}
int
DoSends(
SOCKET_OBJ *
sock)
{
BufferObj *
ptr =
NULL
;
BufferObj *
tmp =
NULL
;
BUFFER_OBJ_EX *
sendobj =
NULL
;
int
ret;
ret =
NO_ERROR;
EnterCriticalSection(
&
sock-
>
SockCritSec)
;
ptr =
(
BufferObj *
)
GotoNextDoubleList(
&
sock-
>
OutOfOrderSends,
&
sock-
>
OutOfOrderSends.
head)
;
while
(
ptr)
{
sendobj =
(
BUFFER_OBJ_EX *
)
container_of(
BUFFER_OBJ_EX,
bufDList,
ptr)
;
if
(
(
sendobj)
&
&
(
sendobj-
>
IoOrder =
=
sock-
>
LastSendIssued)
)
{
if
(
PostSend(
sock,
sendobj)
!
=
NO_ERROR)
{
FreeBufferObjEx(
sendobj)
;
ret =
SOCKET_ERROR;
break
;
}
}
else
break
;
tmp =
ptr;
ptr =
(
BufferObj *
)
GotoNextDoubleList(
&
sock-
>
OutOfOrderSends,
ptr)
;
RemoveDoubleList(
&
sock-
>
OutOfOrderSends,
tmp)
;
//FreeBufferObjEx(sendobj);
}
LeaveCriticalSection(
&
sock-
>
SockCritSec)
;
return
ret;
}
int
handleNum =
0;
int
handlecut =
0;
void
HandleIo(
SOCKET_OBJ *
sock,
BUFFER_OBJ_EX *
buf,
HANDLE CompPort,
DWORD BytesTransfered,
DWORD error
)
{
SOCKET_OBJ *
clientobj=
NULL
;
// New client object for accepted connections
BUFFER_OBJ_EX *
recvobj=
NULL
,
// Used to post new receives on accepted connections
*
sendobj=
NULL
;
// Used to post new sends for data received
BOOL
bCleanupSocket;
char
*
tmp;
int
i;
bCleanupSocket =
FALSE
;
if
(
(
error
!
=
NO_ERROR)
&
&
(
gProtocol =
=
IPPROTO_TCP
)
)
{
FreeBufferObjEx(
buf)
;
if
(
InterlockedDecrement(
&
sock-
>
OutstandingOps)
=
=
0)
{
FreeSocketObj(
sock)
;
}
return
;
}
EnterCriticalSection(
&
sock-
>
SockCritSec)
;
if
(
buf-
>
operation =
=
OP_ACCEPT)
{
HANDLE hrc;
SOCKADDR_STORAGE *
LocalSockaddr=
NULL
,
*
RemoteSockaddr=
NULL
;
int
LocalSockaddrLen,
RemoteSockaddrLen;
sock-
>
lpfnGetAcceptExSockaddrs(
buf-
>
buf,
buf-
>
buflen -
(
(
sizeof
(
SOCKADDR_STORAGE)
+
16)
*
2)
,
sizeof
(
SOCKADDR_STORAGE)
+
16,
sizeof
(
SOCKADDR_STORAGE)
+
16,
(
SOCKADDR
*
*
)
&
LocalSockaddr,
&
LocalSockaddrLen,
(
SOCKADDR
*
*
)
&
RemoteSockaddr,
&
RemoteSockaddrLen)
;
clientobj =
GetSocketObj(
buf-
>
sclient,
sock-
>
af)
;
hrc =
CreateIoCompletionPort(
(
HANDLE)
buf-
>
sclient,
CompPort,
(
ULONG_PTR)
clientobj,
0)
;
if
(
hrc =
=
NULL
)
{
fprintf
(
stderr
,
"CompletionThread: CreateIoCompletionPort failed: %d/n"
,
GetLastError(
)
)
;
return
;
}
sendobj =
GetBufferObjEx(
BytesTransfered)
;
memcpy
(
sendobj-
>
buf,
buf-
>
buf,
BytesTransfered)
;
if
(
PostSend(
clientobj,
sendobj)
=
=
NO_ERROR)
{
// Now post some receives on this new connection
for
(
i =
0;
i <
gOverlappedCount;
i+
+
)
{
recvobj =
GetBufferObjEx(
gBufferSize)
;
if
(
PostRecv(
clientobj,
recvobj)
!
=
NO_ERROR)
{
FreeBufferObjEx(
recvobj)
;
error
=
SOCKET_ERROR;
break
;
}
}
}
else
{
FreeBufferObjEx(
sendobj)
;
error
=
SOCKET_ERROR;
}
PostAccept(
sock,
buf)
;
if
(
error
!
=
NO_ERROR)
{
if
(
clientobj-
>
OutstandingOps =
=
0)
{
closesocket(
clientobj-
>
s)
;
clientobj-
>
s =
INVALID_SOCKET;
FreeSocketObj(
clientobj)
;
}
else
{
clientobj-
>
bClosing =
TRUE
;
}
error
=
NO_ERROR;
}
}
else
if
(
(
buf-
>
operation =
=
OP_READ)
&
&
(
error
=
=
NO_ERROR)
)
{
if
(
(
BytesTransfered >
0)
)
{
printf
(
"HandleIo: %d/n"
,
handleNum+
+
)
;
// Create a buffer to send
sendobj =
GetBufferObjEx(
gBufferSize)
;
printf
(
"handlecut %d./n"
,
handlecut+
+
)
;
// Swap the buffers (i.e. buffer we just received becomes the send buffer)
tmp =
sendobj-
>
buf;
sendobj-
>
buflen =
BytesTransfered;
sendobj-
>
buf =
buf-
>
buf;
sendobj-
>
IoOrder =
buf-
>
IoOrder;
buf-
>
buf =
tmp;
buf-
>
buflen =
gBufferSize;
InsertPendingSend(
sock,
sendobj)
;
if
(
DoSends(
sock)
!
=
NO_ERROR)
{
error
=
SOCKET_ERROR;
printf
(
"SOCKET_ERROR./n"
)
;
}
else
{
// Post another receive
if
(
PostRecv(
sock,
buf)
!
=
NO_ERROR)
{
// In the event the recv fails, clean up the connection
FreeBufferObjEx(
buf)
;
error
=
SOCKET_ERROR;
printf
(
"SOCKET_ERROR./n"
)
;
}
}
}
else
{
sock-
>
bClosing =
TRUE
;
// Free the receive buffer
FreeBufferObjEx(
buf)
;
if
(
DoSends(
sock)
!
=
NO_ERROR)
{
error
=
SOCKET_ERROR;
printf
(
"SOCKET_ERROR./n"
)
;
}
// If this was the last outstanding operation on socket, clean it up
if
(
(
sock-
>
OutstandingOps =
=
0)
&
&
(
sock-
>
OutOfOrderSends.
count
=
=
0)
)
{
bCleanupSocket =
TRUE
;
}
}
}
else
if
(
buf-
>
operation =
=
OP_WRITE)
{
FreeBufferObjEx(
buf)
;
if
(
DoSends(
sock)
!
=
NO_ERROR)
{
error
=
SOCKET_ERROR;
}
}
if
(
error
!
=
NO_ERROR)
{
sock-
>
bClosing =
TRUE
;
}
if
(
(
InterlockedDecrement(
&
sock-
>
OutstandingOps)
=
=
0)
&
&
(
sock-
>
bClosing)
&
&
(
sock-
>
OutOfOrderSends.
count
=
=
0)
)
{
bCleanupSocket =
TRUE
;
}
else
{
if
(
DoSends(
sock)
!
=
NO_ERROR)
{
bCleanupSocket =
TRUE
;
}
}
LeaveCriticalSection(
&
sock-
>
SockCritSec)
;
if
(
bCleanupSocket)
{
closesocket(
sock-
>
s)
;
sock-
>
s =
INVALID_SOCKET;
FreeSocketObj(
sock)
;
}
}
int
iocpNum =
0;
DWORD WINAPI CompletionThread(
LPVOID lpParam)
{
SOCKET_OBJ *
sockobj=
NULL
;
// Per socket object for completed I/O
BUFFER_OBJ_EX *
bufobj=
NULL
;
// Per I/O object for completed I/O
OVERLAPPED *
lpOverlapped=
NULL
;
// Pointer to overlapped structure for completed I/O
HANDLE CompletionPort;
// Completion port handle
DWORD BytesTransfered,
// Number of bytes transfered
Flags;
// Flags for completed I/O
int
rc,
error
;
CompletionPort =
(
HANDLE)
lpParam;
while
(
true
)
{
error
=
NO_ERROR;
rc =
GetQueuedCompletionStatus(
CompletionPort,
&
BytesTransfered,
(
PULONG_PTR)
&
sockobj,
&
lpOverlapped,
INFINITE)
;
printf
(
"iocp %d./n"
,
iocpNum+
+
)
;
bufobj =
CONTAINING_RECORD(
lpOverlapped,
BUFFER_OBJ_EX,
ol)
;
if
(
rc =
=
FALSE
)
{
printf
(
"false./n"
)
;
rc =
WSAGetOverlappedResult(
sockobj-
>
s,
&
bufobj-
>
ol,
&
BytesTransfered,
FALSE
,
&
Flags)
;
if
(
rc =
=
FALSE
)
{
error
=
WSAGetLastError(
)
;
printf
(
"WSAGetOverlappedResult failed %d./n"
,
error
)
;
}
}
HandleIo(
sockobj,
bufobj,
CompletionPort,
BytesTransfered,
error
)
;
}
ExitProcess(
0)
;
return
0;
}
int
_tmain(
int
argc,
_TCHAR*
argv[
]
)
{
WSADATA wsd;
SYSTEM_INFO sysinfo;
SOCKET_OBJ *
sockobj=
NULL
;
SocketObjHeader ListenSockets;
HANDLE CompletionPort,
CompThreads[
MAX_COMPLETION_THREAD_COUNT]
,
hrc;
int
endpointcount=
0,
interval,
rc,
i;
struct
addrinfo
*
res=
NULL
,
*
ptr=
NULL
;
if
(
WSAStartup(
MAKEWORD(
2,
2)
,
&
wsd)
!
=
0)
{
fprintf
(
stderr
,
"unable to load Winsock!/n"
)
;
return
-
1;
}
CompletionPort =
CreateIoCompletionPort(
INVALID_HANDLE_VALUE,
NULL
,
NULL
,
0)
;
if
(
CompletionPort =
=
NULL
)
{
fprintf
(
stderr
,
"CreateIoCompletionPort failed./n"
)
;
return
0;
}
GetSystemInfo(
&
sysinfo)
;
if
(
sysinfo.
dwNumberOfProcessors >
MAX_COMPLETION_THREAD_COUNT)
{
sysinfo.
dwNumberOfProcessors =
MAX_COMPLETION_THREAD_COUNT;
}
//for (i = 0; i < sysinfo.dwNumberOfProcessors; ++i) {
for
(
i =
0;
i <
1;
+
+
i)
{
CompThreads[
i]
=
CreateThread(
NULL
,
0,
CompletionThread,
(
LPVOID)
CompletionPort,
0,
NULL
)
;
if
(
CompThreads[
i]
=
=
NULL
)
{
fprintf
(
stderr
,
"CreateThread failed./n"
)
;
return
-
1;
}
}
res =
ResolveAddress(
gSrvAddr,
gPort,
gAddressFamily,
gSocketType,
gProtocol)
;
if
(
res =
=
NULL
)
{
fprintf
(
stderr
,
"ResolveAddress failed to return any addresses!/n"
)
;
return
-
1;
}
InitializeSingleHead(
&
ListenSockets)
;
ptr =
res;
while
(
ptr)
{
sockobj =
GetSocketObj(
INVALID_SOCKET,
ptr-
>
ai_family)
;
sockobj-
>
s =
socket
(
ptr-
>
ai_family,
ptr-
>
ai_socktype,
ptr-
>
ai_protocol)
;
if
(
sockobj-
>
s =
=
INVALID_SOCKET)
{
fprintf
(
stderr
,
"socket failed: %d/n"
,
WSAGetLastError(
)
)
;
return
-
1;
}
hrc =
CreateIoCompletionPort(
(
HANDLE)
sockobj-
>
s,
CompletionPort,
(
ULONG_PTR)
sockobj,
0)
;
if
(
hrc =
=
NULL
)
{
fprintf
(
stderr
,
"CreateIoCompletionPort failed: %d/n"
,
GetLastError(
)
)
;
return
-
1;
}
rc =
bind
(
sockobj-
>
s,
ptr-
>
ai_addr,
ptr-
>
ai_addrlen)
;
if
(
rc =
=
SOCKET_ERROR)
{
fprintf
(
stderr
,
"bind failed: %d/n"
,
WSAGetLastError(
)
)
;
return
-
1;
}
if
(
gProtocol =
=
IPPROTO_TCP
)
{
BUFFER_OBJ_EX *
acceptobj=
NULL
;
GUID guidAcceptEx =
WSAID_ACCEPTEX,
guidGetAcceptExSockaddrs =
WSAID_GETACCEPTEXSOCKADDRS;
DWORD bytes;
// Need to load the Winsock extension functions from each provider
// -- e.g. AF_INET and AF_INET6.
rc =
WSAIoctl(
sockobj-
>
s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&
guidAcceptEx,
sizeof
(
guidAcceptEx)
,
&
sockobj-
>
lpfnAcceptEx,
sizeof
(
sockobj-
>
lpfnAcceptEx)
,
&
bytes,
NULL
,
NULL
)
;
if
(
rc =
=
SOCKET_ERROR)
{
fprintf
(
stderr
,
"WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER failed: %d/n"
,
WSAGetLastError(
)
)
;
return
-
1;
}
rc =
WSAIoctl(
sockobj-
>
s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&
guidGetAcceptExSockaddrs,
sizeof
(
guidGetAcceptExSockaddrs)
,
&
sockobj-
>
lpfnGetAcceptExSockaddrs,
sizeof
(
sockobj-
>
lpfnGetAcceptExSockaddrs)
,
&
bytes,
NULL
,
NULL
)
;
if
(
rc =
=
SOCKET_ERROR)
{
fprintf
(
stderr
,
"WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER faled: %d/n"
,
WSAGetLastError(
)
)
;
return
-
1;
}
rc =
listen
(
sockobj-
>
s,
100)
;
if
(
rc =
=
SOCKET_ERROR)
{
fprintf
(
stderr
,
"listen failed: %d/n"
,
WSAGetLastError(
)
)
;
return
-
1;
}
sockobj-
>
PendingAccepts =
(
BUFFER_OBJ_EX *
*
)
HeapAlloc(
GetProcessHeap(
)
,
HEAP_ZERO_MEMORY,
(
sizeof
(
BUFFER_OBJ_EX *
)
*
gOverlappedCount)
)
;
if
(
sockobj-
>
PendingAccepts =
=
NULL
)
{
fprintf
(
stderr
,
"HeapAlloc failed: %d/n"
,
GetLastError(
)
)
;
ExitProcess(
-
1)
;
}
for
(
i=
0;
i <
gOverlappedCount ;
i+
+
)
{
sockobj-
>
PendingAccepts[
i]
=
acceptobj =
GetBufferObjEx(
gBufferSize)
;
PostAccept(
sockobj,
acceptobj)
;
}
EnqueueSingleList(
&
ListenSockets,
&
(
sockobj-
>
next)
)
;
}
ptr =
ptr-
>
ai_next;
}
freeaddrinfo
(
res)
;
interval =
0;
while
(
true
)
{
rc =
WSAWaitForMultipleEvents(
1,
CompThreads,
TRUE
,
5000,
FALSE
)
;
if
(
rc =
=
WAIT_FAILED)
{
fprintf
(
stderr
,
"WSAWaitForMultipleEvents failed: %d/n"
,
WSAGetLastError(
)
)
;
break
;
}
else
if
(
rc =
=
WAIT_TIMEOUT)
{
interval+
+
;
if
(
interval =
=
12)
{
SocketObj *
listenptr=
NULL
;
int
optval,
optlen;
// Walk the list of outstanding accepts
listenptr =
(
SocketObj *
)
GotoNextSingleList(
&
ListenSockets,
ListenSockets.
head)
;
while
(
listenptr)
{
sockobj =
(
SOCKET_OBJ *
)
container_of(
SOCKET_OBJ,
next,
listenptr)
;
for
(
i=
0;
i <
gOverlappedCount ;
i+
+
)
{
optlen =
sizeof
(
optval)
;
rc =
getsockopt
(
sockobj-
>
PendingAccepts[
i]
-
>
sclient,
SOL_SOCKET,
SO_CONNECT_TIME,
(
char
*
)
&
optval,
&
optlen
)
;
if
(
rc =
=
SOCKET_ERROR)
{
fprintf
(
stderr
,
"getsockopt: SO_CONNECT_TIME failed: %d/n"
,
WSAGetLastError(
)
)
;
return
-
1;
}
if
(
(
optval !
=
0xFFFFFFFF)
&
&
(
optval >
300)
)
{
closesocket(
sockobj-
>
PendingAccepts[
i]
-
>
sclient)
;
}
}
listenptr =
(
SocketObj *
)
GotoNextSingleList(
&
ListenSockets,
listenptr)
;
}
interval =
0;
}
}
}
WSACleanup(
)
;
return
0;
}
版权声明:
原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律责任。
相关文章推荐
- Windows socket之IO完成端口(IOCP)模型开发(原作者:csdn——ithzhang)
- Windows socket之IO完成端口(IOCP)模型开发
- Windows socket之IO完成端口(IOCP)模型开发
- Windows socket之IO完成端口(IOCP)模型开发
- Windows socket之IO完成端口(IOCP)模型开发
- WinSock2 I/O六模型之六( I/O完成端口)
- Windows Socket五种I/O模型——代码全攻略(2)
- windows socket----select模型
- Windows Socket五种I/O模型——序言
- Windows Socket五种I/O模型——代码全攻略
- Windows Socket五种I/O模型详细介绍(精)
- Socket I/O模型之完成端口(completion port)
- Windows Socket五种I/O模型——代码全攻略
- Windows Socket五种I/O模型
- Windows Socket五种I/O模型
- Windows Socket五种I/O模型——代码全攻略
- Windows Socket五种I/O模型
- Windows Socket五种I/O模型——select模型
- Windows Socket I/O模型全接触
- Windows Socket五种I/O模型