#include "stdafx.h"
#include "myiocp.h"
#include "Winsock2.h"
/// 宏定义
#define DATA_BUFSIZE (8192)
#define MAX_DATA_NODE (10000*32)
#define MAX_SOCKET (65536)
#pragma comment(lib, "ws2_32")
/// 结构体定义
typedef struct
{
OVERLAPPED Overlapped;
WSABUF DataBuf;
CHAR Buffer[DATA_BUFSIZE];
} PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;
typedef struct
{
SOCKET Socket;
} PER_HANDLE_DATA, * LPPER_HANDLE_DATA;
MyIcopSocket::MyIcopSocket()
{
m_nSocket = 0;
m_queCon.InitQueue(MAX_SOCKET,FALSE);
m_queData = new CMyQueue[MAX_SOCKET];
InitWinsock();
}
MyIcopSocket::~MyIcopSocket()
{
if (m_nSocket > 0)
{
closesocket(m_nSocket);
m_nSocket = 0;
}
WSACleanup();
delete [] m_queData;
}
BOOL MyIcopSocket::InitWinsock()
{
// 初始化WINSOCK
WSADATA wsd;
if( WSAStartup(MAKEWORD(2, 2), &wsd) != 0)
{
return FALSE;
}
return TRUE;
}
int MyIcopSocket::BindServerOverlapped(int nPort,const char *szSvrIpAddress)
{
// 创建socket
SOCKET sServer = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
// 绑定端口
struct sockaddr_in servAddr;
servAddr.sin_family = AF_INET;
servAddr.sin_port = htons(nPort);
if (szSvrIpAddress == NULL)
{
servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
}
else
{
servAddr.sin_addr.s_addr = inet_addr(szSvrIpAddress);
}
if(bind(sServer, (struct sockaddr *)&servAddr, sizeof(servAddr)) < 0)
{
return 0;
}
// 设置监听队列为200
if(listen(sServer, 2000) != 0)
{
return 0;
}
return sServer;
}
DWORD MyIcopSocket::ProcessIO(LPVOID lpParam)
{
MyIcopSocket* lp_this = (MyIcopSocket*)lpParam;
HANDLE CompletionPort = lp_this->CompletionPort;
DWORD BytesTransferred;
LPPER_HANDLE_DATA PerHandleData;
LPPER_IO_OPERATION_DATA PerIoData;
while(lp_this->m_nSocket)
{
if(0 == GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (LPDWORD)&PerHandleData, (LPOVERLAPPED*)&PerIoData, INFINITE))
{
if( (GetLastError() == WAIT_TIMEOUT) || (GetLastError() == ERROR_NETNAME_DELETED) )
{
closesocket(PerHandleData->Socket);
lp_this->m_queCon.DeleteQueue(PerHandleData->Socket);
lp_this->m_queData[PerHandleData->Socket].FreeQueue();
delete PerIoData;
delete PerHandleData;
continue;
}
else
{
/*OutErr("GetQueuedCompletionStatus failed!");*/
}
return 0;
}
// 说明客户端已经退出
if(BytesTransferred == 0)
{
closesocket(PerHandleData->Socket);
delete PerIoData;
delete PerHandleData;
continue;
}
// 取得数据并处理
lp_this->m_queData[PerHandleData->Socket].Enqueue(PerIoData->Buffer,BytesTransferred);
// 继续向 socket 投递WSARecv操作
DWORD Flags = 0;
DWORD dwRecv = 0;
ZeroMemory(PerIoData, sizeof(PER_IO_OPERATION_DATA));
PerIoData->DataBuf.buf = PerIoData->Buffer;
PerIoData->DataBuf.len = DATA_BUFSIZE;
WSARecv(PerHandleData->Socket, &PerIoData->DataBuf, 1, &dwRecv, &Flags, &PerIoData->Overlapped, NULL);
}
return 0;
}
DWORD MyIcopSocket::ProcessWaitAccept(LPVOID lpParam)
{
MyIcopSocket* lp_this = (MyIcopSocket*)lpParam;
waitParameter *wp = (waitParameter *)&lp_this->wp;
SOCKET sListen = wp->nSocket;
HANDLE CompletionPort = wp->CompletionPort;
int len ;
struct sockaddr_in server;
memset(&server,0x00,sizeof(server));
len = sizeof(server);
char *szClientIp;
/*unsigned long lIp;*/
//
SOCKET sClient;
LPPER_HANDLE_DATA PerHandleData;
LPPER_IO_OPERATION_DATA PerIoData;
while(lp_this->m_nSocket)
{
// 等待客户端接入
//sClient = WSAAccept(sListen, NULL, NULL, NULL, 0);
sClient = accept(sListen, (struct sockaddr *)&server, &len);
if (sClient <= 0)
{
continue;
}
szClientIp = inet_ntoa(server.sin_addr);
/*lIp = inet_addr(szClientIp);*/
PerHandleData = new PER_HANDLE_DATA();
PerHandleData->Socket = sClient;
lp_this->m_queCon.Enqueue(szClientIp,strlen(szClientIp)+1,sClient);
lp_this->m_queData[sClient].InitQueue(MAX_DATA_NODE);
// 将接入的客户端和完成端口联系起来
CreateIoCompletionPort((HANDLE)sClient, CompletionPort, (DWORD)PerHandleData, 0);
// 建立一个Overlapped,并使用这个Overlapped结构对socket投递操作
PerIoData = new PER_IO_OPERATION_DATA();
ZeroMemory(PerIoData, sizeof(PER_IO_OPERATION_DATA));
PerIoData->DataBuf.buf = PerIoData->Buffer;
PerIoData->DataBuf.len = DATA_BUFSIZE;
// 投递一个WSARecv操作
DWORD Flags = 0;
DWORD dwRecv = 0;
WSARecv(sClient, &PerIoData->DataBuf, 1, &dwRecv, &Flags, &PerIoData->Overlapped, NULL);
}
DWORD dwByteTrans=0;
PostQueuedCompletionStatus(CompletionPort, dwByteTrans, 0, 0);
closesocket(sListen);
return 0;
}
BOOL MyIcopSocket::RunServer(int nPort,const char *szSvrIpAddress)
{
/*waitParameter wp;*/
// 创建侦听SOCKET
m_nSocket = BindServerOverlapped(nPort,szSvrIpAddress);
if (m_nSocket == 0)
{
return FALSE;
}
/*HANDLE */CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
// 根据系统的CPU来创建工作者线程
SYSTEM_INFO SystemInfo;
GetSystemInfo(&SystemInfo);
//启动完成端口接收线程
for(DWORD i = 0; i < SystemInfo.dwNumberOfProcessors * 2; i++)
{
HANDLE hProcessIO = CreateThread(NULL, 0, ProcessIO, this/*CompletionPort*/, 0, NULL);
if(hProcessIO)
{
CloseHandle(hProcessIO);
}
}
wp.CompletionPort = CompletionPort;
wp.nSocket = m_nSocket;
//启动等待连接线程
HANDLE hProcessIO = CreateThread(NULL, 0, ProcessWaitAccept, this/*(LPVOID*)&wp*/, 0, NULL);
if(hProcessIO)
{
CloseHandle(hProcessIO);
}
return true;
}
int MyIcopSocket::GetDataFromClient(int nSocket,char *szData,int *outSocket)
{
BOOL bAll = FALSE;
if (nSocket <= 0 || nSocket>=(MAX_SOCKET))
{
bAll = TRUE;
}
if (bAll)
{
static i=-1;
int j=0;
NODE *pNode;
while (1)
{
i++;
if (i >= MAX_SOCKET)
{
i = 0;
}
pNode = m_queData[i].Dequeue();
if (pNode != NULL)
{
memcpy(szData,pNode->data,pNode->data_len);
*outSocket = i;
return pNode->data_len;
}
j ++;
if (j >= MAX_SOCKET)
{
break;
}
}
}
else
{
NODE *pNode = m_queData[nSocket].Dequeue();
if (pNode == NULL)
{
return 0;
}
else
{
memcpy(szData,pNode->data,pNode->data_len);
return pNode->data_len;
}
}
return 0;
}
void MyIcopSocket::CloseSocket(int nSocket)
{
closesocket(nSocket);
}
/************************************************************************/
/*IOCP客户端API封装 */
/************************************************************************/
int MyIcopSocket::ClientConnect(int nPort,const char *szIpAddress)
{
SOCKET s,ss;
int len;
struct sockaddr_in server;
InitWinsock();
memset((void *)&server,0x00,sizeof(server));
s = socket(AF_INET, SOCK_STREAM, 0);
server.sin_family = AF_INET;
server.sin_addr.s_addr = inet_addr(szIpAddress);
server.sin_port = htons(nPort);
len = sizeof(server);
ss=connect(s, (struct sockaddr *)&server, len);
if (SOCKET_ERROR ==ss)
{
closesocket(s);
return 0;
}
return s;
}
int MyIcopSocket::IocpWrite(int nCommSocket,int nLen,const char *szBuf)
{
int writeLen=0,ret=0;
while(1)
{
ret = send(nCommSocket, szBuf+writeLen, nLen-writeLen, 0);
if(ret<=0)
{
//return FALSE;
return writeLen;
}
writeLen+=ret;
if (writeLen>=nLen)
{
break;
}
}
return writeLen;
}
void MyIcopSocket::SetTimeOut(int nCommSocket,int nTimeout)
{
setsockopt(nCommSocket,SOL_SOCKET,SO_RCVTIMEO,(char *)&nTimeout,sizeof(nTimeout));
}
int MyIcopSocket::IocpRead(int nCommSocket,int nLen,char *szBuf)
{
int readLen=0;
int tmplen=0;
// if (nTimeout)
// {
/
- 1
- 2
前往页