《windows網絡編程技術》之 Winsock I/O方法(2)
重疊模式 之 基于事件通知的重疊模型
重疊模式是的總體設計以WIN32重疊I/O機制為基礎,要想在一個套接字上使用重疊I/O模型,必須使用WSA_FLAG_OVERLAPPED這個標志,如下:
s=WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
如果使用的是socket函數,則默認設置為WSA_FLAG_OVERLAPPED。
使用重疊模式,就不得不熟悉WSAOVERLAPPED結構,下面是這個結構的定義:
typedef struct WSAOVERLAPPED
{
DWORD Internal;
DWORD InternalHigh;
DWORD Offset;
DWORD OffsetHigh;
WSAEVENT hEvent;
} WSAOVERLAPPED, FAR * LPWSAOVERLAPPED;
其中前4個參數都是系統內部調用的,不關我們的事。
WSAOVERLAPPED結構中的hEvent即可和一個事件對象進行關聯:用WSACreateEvent創建一個事件對象,將重疊結構的hEvent字段分配給新創建的事件對象。然后調用下列winsock函數(帶重疊結構類型的參數),
WSASend 、WSASendTo、 WSARecv、 WSARecvFrom、 WSAIoctl、AcceptEx、 TransimitFile
在對重疊I/O請求完成后,Winsock會更改重疊結構中的事件對象的事件傳信狀態,從“未傳信”-->“已傳信”,由于WSACreate最開始在一種未傳信的工作狀態中,并用一種人工重設模式創建句柄的,所以在I/O請求完成后我們的程序有責任將工作狀態從“已傳信”轉到“未傳信”,即調用WSAResetEvent函數。
對重疊I/O請求完成后,我們可以調用WSAGetOverlappedResult函數來判斷重疊調用是否成功,定義如下:
BOOL WSAGetOverlappedResult(
SOCKET s,
LPWSAOVERLAPPED lpOverlapped,
LPDWORD lpcbTransfer,
BOOL fWait,
LPDWORD lpdwFlags
);
其中,lpcbTransfer對應一個雙字變量,負責接收一次重疊發送或接收操作實際傳輸的字節數。
fWait,用于決定是否應該等待一次重疊操作完成。
lpdwFlags,負責接收結果標志。
下面是一個完整的過程:
1.創建一個套接字,開始在指定的端口上監聽連接請求。
2.接收進入的連接請求。
3.為接受的套接字新建一個WSAOVERLAPPED結構,并為該結構分配一個事件對象句柄,同時把該對象句柄分配給一個事件數組,以便由WSAWaitForMultipleEvent函數使用。
4.在套接字上投遞一個異步WSARecv請求,指定參數為WSAOVERLAPPED結構。
5.使用步驟3中的事件數組,調用WSAWaitForMultipleEvent,等待與重疊調用關聯的事件對像進入“已傳信”狀態(即,等待事件觸發)
6.WSAWaitForMultipleEvent函數完成后,針對事件數組,調用WSAResetEvent重設事件對象,并對完成的重疊請求進行處理。
7.使用WSAGetOverlappedResult函數,判斷重疊調用的返回狀態。
8.在套接字上投遞另一個重疊WSARecv請求。
9.重復5--8步驟。
代碼如下:
#include
#include
#include
#define PORT 5150
#define DATA_BUFSIZE 8192
typedef struct _SOCKET_INFORMATION {
CHAR Buffer[DATA_BUFSIZE];
WSABUF DataBuf;
SOCKET Socket;
WSAOVERLAPPED Overlapped;
DWORD BytesSEND;
DWORD BytesRECV;
} SOCKET_INFORMATION, * LPSOCKET_INFORMATION;
DWORD WINAPI ProcessIO(LPVOID lpParameter);
DWORD EventTotal = 0;
WSAEVENT EventArray[WSA_MAXIMUM_WAIT_EVENTS];
LPSOCKET_INFORMATION SocketArray[WSA_MAXIMUM_WAIT_EVENTS];
CRITICAL_SECTION CriticalSection;
void main(void)
{
WSADATA wsaData;
SOCKET ListenSocket, AcceptSocket;
SOCKADDR_IN InternetAddr;
DWORD Flags;
DWORD ThreadId;
DWORD RecvBytes;
INT Ret;
InitializeCriticalSection(&CriticalSection);
if ((Ret = WSAStartup(0x0202,&wsaData)) != 0)
{
printf("WSAStartup failed with error %d\n", Ret);
WSACleanup();
return;
}
if ((ListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0,
WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET)
{
printf("Failed to get a socket %d\n", WSAGetLastError());
return;
}
InternetAddr.sin_family = AF_INET;
InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
InternetAddr.sin_port = htons(PORT);
if (bind(ListenSocket, (PSOCKADDR) &InternetAddr, sizeof(InternetAddr)) == SOCKET_ERROR)
{
printf("bind() failed with error %d\n", WSAGetLastError());
return;
}
if (listen(ListenSocket, 5))
{
printf("listen() failed with error %d\n", WSAGetLastError());
return;
}
// Setup the listening socket for connections.
if ((AcceptSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0,
WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET)
{
printf("Failed to get a socket %d\n", WSAGetLastError());
return;
}
if ((EventArray[0] = WSACreateEvent()) == WSA_INVALID_EVENT)
{
printf("WSACreateEvent failed with error %d\n", WSAGetLastError());
return;
}
// Create a thread to service overlapped requests
if (CreateThread(NULL, 0, ProcessIO, NULL, 0, &ThreadId) == NULL)
{
printf("CreateThread failed with error %d\n", GetLastError());
return;
}
EventTotal = 1;
while(TRUE)
{
// Accept inbound connections
if ((AcceptSocket = accept(ListenSocket, NULL, NULL)) == INVALID_SOCKET)
{
printf("accept failed with error %d\n", WSAGetLastError());
return;
}
EnterCriticalSection(&CriticalSection);
// Create a socket information structure to associate with the accepted socket.
if ((SocketArray[EventTotal] = (LPSOCKET_INFORMATION) GlobalAlloc(GPTR,
sizeof(SOCKET_INFORMATION))) == NULL)
{
printf("GlobalAlloc() failed with error %d\n", GetLastError());
return;
}
// Fill in the details of our accepted socket.
SocketArray[EventTotal]->Socket = AcceptSocket;
ZeroMemory(&(SocketArray[EventTotal]->Overlapped), sizeof(OVERLAPPED));
SocketArray[EventTotal]->BytesSEND = 0;
SocketArray[EventTotal]->BytesRECV = 0;
SocketArray[EventTotal]->DataBuf.len = DATA_BUFSIZE;
SocketArray[EventTotal]->DataBuf.buf = SocketArray[EventTotal]->Buffer;
if ((SocketArray[EventTotal]->Overlapped.hEvent = EventArray[EventTotal] =
WSACreateEvent()) == WSA_INVALID_EVENT)
{
printf("WSACreateEvent() failed with error %d\n", WSAGetLastError());
return;
}
// Post a WSARecv request to to begin receiving data on the socket
Flags = 0;
if (WSARecv(SocketArray[EventTotal]->Socket,
&(SocketArray[EventTotal]->DataBuf), 1, &RecvBytes, &Flags,
&(SocketArray[EventTotal]->Overlapped), NULL) == SOCKET_ERROR)
{
if (WSAGetLastError() != ERROR_IO_PENDING)
{
printf("WSARecv() failed with error %d\n", WSAGetLastError());
return;
}
}
EventTotal++;
LeaveCriticalSection(&CriticalSection);
//
// Signal the first event in the event array to tell the worker thread to
// service an additional event in the event array
//
if (WSASetEvent(EventArray[0]) == FALSE)
{
printf("WSASetEvent failed with error %d\n", WSAGetLastError());
return;
}
}
}
DWORD WINAPI ProcessIO(LPVOID lpParameter)
{
DWORD Index;
DWORD Flags;
LPSOCKET_INFORMATION SI;
DWORD BytesTransferred;
DWORD i;
DWORD RecvBytes, SendBytes;
// Process asynchronous WSASend, WSARecv requests.
while(TRUE)
{
if ((Index = WSAWaitForMultipleEvents(EventTotal, EventArray, FALSE,
WSA_INFINITE, FALSE)) == WSA_WAIT_FAILED)
{
printf("WSAWaitForMultipleEvents failed %d\n", WSAGetLastError());
return 0;
}
// If the event triggered was zero then a connection attempt was made
// on our listening socket.
if ((Index - WSA_WAIT_EVENT_0) == 0)
{
WSAResetEvent(EventArray[0]);
continue;
}
SI = SocketArray[Index - WSA_WAIT_EVENT_0];
WSAResetEvent(EventArray[Index - WSA_WAIT_EVENT_0]);
if (WSAGetOverlappedResult(SI->Socket, &(SI->Overlapped), &BytesTransferred,
FALSE, &Flags) == FALSE || BytesTransferred == 0)
{
printf("Closing socket %d\n", SI->Socket);
if (closesocket(SI->Socket) == SOCKET_ERROR)
{
printf("closesocket() failed with error %d\n", WSAGetLastError());
}
GlobalFree(SI);
WSACloseEvent(EventArray[Index - WSA_WAIT_EVENT_0]);
// Cleanup SocketArray and EventArray by removing the socket event handle
// and socket information structure if they are not at the end of the
// arrays.
EnterCriticalSection(&CriticalSection);
if ((Index - WSA_WAIT_EVENT_0) + 1 != EventTotal)
for (i = Index - WSA_WAIT_EVENT_0; i < EventTotal; i++)
{
EventArray[i] = EventArray[i + 1];
SocketArray[i] = SocketArray[i + 1];
}
EventTotal--;
LeaveCriticalSection(&CriticalSection);
continue;
}
// Check to see if the BytesRECV field equals zero. If this is so, then
// this means a WSARecv call just completed so update the BytesRECV field
// with the BytesTransferred value from the completed WSARecv() call.
if (SI->BytesRECV == 0)
{
SI->BytesRECV = BytesTransferred;
SI->BytesSEND = 0;
}
else
{
SI->BytesSEND += BytesTransferred;
}
if (SI->BytesRECV > SI->BytesSEND)
{
// Post another WSASend() request.
// Since WSASend() is not gauranteed to send all of the bytes requested,
// continue posting WSASend() calls until all received bytes are sent.
ZeroMemory(&(SI->Overlapped), sizeof(WSAOVERLAPPED));
SI->Overlapped.hEvent = EventArray[Index - WSA_WAIT_EVENT_0];
SI->DataBuf.buf = SI->Buffer + SI->BytesSEND;
SI->DataBuf.len = SI->BytesRECV - SI->BytesSEND;
if (WSASend(SI->Socket, &(SI->DataBuf), 1, &SendBytes, 0,
&(SI->Overlapped), NULL) == SOCKET_ERROR)
{
if (WSAGetLastError() != ERROR_IO_PENDING)
{
printf("WSASend() failed with error %d\n", WSAGetLastError());
return 0;
}
}
}
else
{
SI->BytesRECV = 0;
// Now that there are no more bytes to send post another WSARecv() request.
Flags = 0;
ZeroMemory(&(SI->Overlapped), sizeof(WSAOVERLAPPED));
SI->Overlapped.hEvent = EventArray[Index - WSA_WAIT_EVENT_0];
SI->DataBuf.len = DATA_BUFSIZE;
SI->DataBuf.buf = SI->Buffer;
if (WSARecv(SI->Socket, &(SI->DataBuf), 1, &RecvBytes, &Flags,
&(SI->Overlapped), NULL) == SOCKET_ERROR)
{
if (WSAGetLastError() != ERROR_IO_PENDING)
{
printf("WSARecv() failed with error %d\n", WSAGetLastError());
return 0;
}
}
}
}
}

浙公網安備 33010602011771號