/*******************************************************************************
模块名 : CIOCP,CIOCPClient,CIOCPServer
文件名 : CIOCP.CPP
相关文件 : CIOCP.h,MemPool.h
文件实现功能 : 完成端口基类,派生的客户端基类,派生的服务端基类
作者 : <黄宇羽>
版本 : 1.0
备注 :
--------------------------------------------------------------------------------
修改记录 :
日 期 版本 修改人
2008/11/21 1.0 <黄宇羽>
修改内容 :
创建
--------------------------------------------------------------------------------
日 期 版本 修改人
2009/06/09 1.1 <黄宇羽>
修改内容 :
修正socket重复利用,反复连接断连的稳定性
--------------------------------------------------------------------------------
日 期 版本 修改人
2009/06/09 1.2 <黄宇羽>
修改内容 :
加入客户端完成端口的异步连接,仅支持XP及win2003或以上
使用CONNECTEX,尚有BUG,当连接的目标主机没有打开,则陷入
永远等待.但是对于主机有开,但是端口没开,按照TCP标准,
仍然会回复一个RST信号,所以不会陷入永远等待.
--------------------------------------------------------------------------------
日 期 版本 修改人
2009/07/25 2.0 <黄宇羽>
修改内容 :
大规模修改客户端的断连及通知机制,把原客户端的销毁时存在完成端口正调用事件的异常
去掉,及断连检测更准确.取消原事件通知机制,唯一问题是销毁的时候若投递没有完全取
消的话销毁有可能需要最坏情况会延时1ms
--------------------------------------------------------------------------------
日 期 版本 修改人
2009/07/29 2.1 <黄宇羽>
修改内容 :
大规模修改调整各类的函数和基类的归属关系
*******************************************************************************/
#include "CIocpFile.h"
#include "MemPool.h"
//#ifdef _WIN32_WINNT
//#include <mswsock.h>
//#pragma comment(lib, "Mswsock.lib ")
//#endif
#pragma comment( lib, "ws2_32.lib" )
#define OP_IOREAD 0
#define OP_IOWRITE 1
#define OP_IOEXIT 2
#define OP_EXITTHREAD 3
#define IOCPSOURCE_ALLOC 1 //描述包的来源,来源于分配
#define IOCPSOURCE_LOCALE 2 //描述包的来源,来源于临时变量
/*全局初始化*/
/*******************************************************************************
该数值内存池为全局内存池的最大值。连接投递,接收投递,及发送投递都使用该内存池。
该内存池不建议开太大。太大会造成当发送不出去时绑定在核心内存的数据过多造成系统的
通信崩溃,建议控制在100,000以内,经验值为80,000
*******************************************************************************/
#define MAXMEMPOOL 1000
#define MAXBUFSIZE 4096
#define MAXSOCKET 65535
#ifndef SIO_KEEPALIVE_VALS
#define SIO_KEEPALIVE_VALS _WSAIOW(IOC_VENDOR,4)
#endif
#pragma pack(push, 1)
//该变量在某些编译器下可能会存在全局变量的命名冲突,小数情况懒得改
static HANDLE global_hIocpfile = NULL;// 完成端口的句柄
#pragma pack(pop)
typedef struct
{
OVERLAPPED ol;
SOCKET s;
int OpCode;//指示该缓冲用来作为的读写
LPCIOCPFile piocp;//指向使用的类CLIENT类指针
DWORD dwRevTotalLen;//已接收在缓冲的总指针数
DWORD dwActBufLen;//实际缓冲的原始长度
DWORD dwTargetLen;//目标长度
char *ActBuf;//实际缓冲的原始指针
WSABUF wbuf;//用来投递的缓冲区
DWORD dwBytes, dwFlags;//记录每次接受到的数据数
BYTE bsource;
char buf[MAXBUFSIZE];//内置使用的缓冲
}PER_IO_OPERATION_DATA,*LPPER_IO_OPERATION_DATA;
CMemPool <PER_IO_OPERATION_DATA> *global_mempool;
void LogOutLastError(char *fmr,...)
{
va_list ap;
va_start(ap, fmr);
vprintf(fmr, ap);
va_end(ap);
}
class CMutexLockex
{
private:
LPCRITICAL_SECTION m_Lock;
public:
CMutexLockex(LPCRITICAL_SECTION lpCS)
{
m_Lock = lpCS;
EnterCriticalSection(m_Lock);
}
~CMutexLockex()
{
LeaveCriticalSection(m_Lock);
}
};
class CIncCountLockex
{
public:
CIncCountLockex(long *lpCS)
{
InterlockedIncrement(lpCS);
}
};
class CDecCountLockex
{
private:
long *m_Lock;
public:
CDecCountLockex(long *lpCS)
{
m_Lock = lpCS;
}
~CDecCountLockex()
{
InterlockedDecrement(m_Lock);
//SetEvent(m_Lock->LockSemaphore);
}
};
class CWaitCountLockex
{
private:
public:
CWaitCountLockex(long *lpCS)
{
//EnterCriticalSection(m_Lock);
while((*lpCS)!=0) Sleep(1);
}
};
class E7E39ADDBE814C888A271764B70EC363
{
private:
CRITICAL_SECTION m_Lock;
//该锁是用于关闭某一个客户端时候的。防止人为关闭与自然关闭同时发生资源访问冲突
DWORD m_wthreadID;
int m_workthcount;
HANDLE *g_hwThread; // 工作线程句柄
bool m_workThread;
bool InitSock()
{
int ret;
WSADATA wsd;
ret = WSAStartup(MAKEWORD(2,2), &wsd);
if(ret!=0)
return false;
if(LOBYTE(wsd.wVersion)!= 2||HIBYTE(wsd.wVersion)!=2)
{
WSACleanup();
return false ;
}
InitializeCriticalSection(&m_Lock);
return true ;
}
static DWORD __stdcall WorkThread(LPVOID Param);
static void CloseSocket(PER_IO_OPERATION_DATA *olp,void* re);
static void FreeOlp(PER_IO_OPERATION_DATA *olp);
static bool PostRecv(PER_IO_OPERATION_DATA *olp);
static bool PostSend(PER_IO_OPERATION_DATA *olp);
public:
void StopWorkThread();
int StartWorkThread();
E7E39ADDBE814C888A271764B70EC363()
{
if(!InitSock())
return;
global_mempool = new CMemPool <PER_IO_OPERATION_DATA>(MAXMEMPOOL);
}
~E7E39ADDBE814C888A271764B70EC363()
{
StopWorkThread();
delete global_mempool;
DeleteCriticalSection( &m_Lock );
}
};
static E7E39ADDBE814C888A271764B70EC363 B18880962B494E4898AB548D2CD9C609;
void E7E39ADDBE814C888A271764B70EC363::FreeOlp(PER_IO_OPERATION_DATA *olp)
{
olp->s = INVALID_SOCKET;
global_mempool->Free(olp);
}
void E7E39ADDBE814C888A271764B70EC363::CloseSocket(PER_IO_OPERATION_DATA *olp,void* re)
{
// 客户端断开连接
if((olp->OpCode==OP_IOREAD))
{
LPCIOCPFile lpiocp = olp->piocp;
if(lpiocp!=NULL)
{
//// try{lpiocp->TryCloseInIocp();}catch(...){}
}
}
FreeOlp(olp);
}
bool E7E39ADDBE814C888A271764B70EC363::PostRecv(PER_IO_OPERATION_DATA *olp)
{
bool ret;
LPCIOCPFile lpiocp = olp->piocp;
ret = false;
if(lpiocp!=NULL)
{
try{ret = lpiocp->PostRecv(olp);}catch(...){}
}
return ret;
}
bool E7E39ADDBE814C888A271764B70EC363::PostSend(PER_IO_OPERATION_DATA *olp)
{
int getret;
bool ret;
int geterror;
DWORD dwNumberOfBytesSend = 0;
olp->dwFlags = 0;
ret = true;
getret = WSASend(olp->s,&olp->wbuf,1,&dwNumberOfBytesSend,olp->dwFlags, &olp->ol,NULL);
if(getret == SOCKET_ERROR)
{
if((geterror=WSAGetLastError())!=ERROR_IO_PENDING)
{
LogOutLastError("IOCPThread::PostSend WSASend Error:%d %d %d\n",geterror,olp->s);
ret = false;
}
}
return ret;
}
DWORD __stdcall E7E39ADDBE814C888A271764B70EC363::WorkThread(LPVOID Param)
{
E7E39ADDBE814C888A271764B70EC363 * pthis
= (E7E39ADDBE814C888A271764B70EC363 *)Param;
void * re;
OVERLAPPED * pOverlap;
DWORD berByte;
int ret;
DWORD dwoffset;
bool sckclosed;
while(global_hIocpfile)
{
berByte = 0;
ret = GetQueuedCompletionStatus(global_hIocpfile, &berByte, (LPDWORD)&re, (LPOVERLAPPED *)&pOverlap, INFINITE);
PER_IO_OPERATION_DATA *olp = (PER_IO_OPERATION_DATA *)pOverlap;
if (re == NULL)
continue;
if(olp == NULL)
continue;
sckclosed = false;
if ((olp->OpCode!=OP_EXITTHREAD)&&((!ret)||(berByte == 0)||(olp->s==INVALID_SOCKET)))
{
sckclosed = true;
}
if(!sckclosed)
{
switch(olp->OpCode)
{
case OP_IOREAD: