#include "CSocket.h"
#include<stdlib.h>
#include<iostream>
#include<conio.h>
#include<time.h>
#include<Mswsock.h>
#include<set>
#include<string>
#include "SplitPackage.h"
#include "PackQueue.h"
#include "ClientQueue.h"
#include "SendBufferQueue.h"
#pragma comment(lib,"ws2_32.lib")
#pragma comment(lib,"Mswsock.lib")
using namespace std;
#define RECV_POSTED 0x0001
#define SEND_POSTED 0x0002
#define ACCEPT_POSTED 0x0003
#define RECV_POST_PER_SOCK 4
PackQueue pack_queue;
ClientQueue client_queue;
SendingBufQue client_send_queue;
set<LPPER_HANDLE_DATA> client_conn;
CRITICAL_SECTION m_csLock;
HANDLE hPackHandleEvent;
HANDLE hSplitPackEvent;
HANDLE hSendPackEvent;
DWORD WINAPI SplitPackThread(
LPVOID lpParameter
);
DWORD WINAPI HandlePackThread(
LPVOID lpParameter
);
DWORD WINAPI SendPackThread(
LPVOID lpParameter
);
bool PostRecv(LPPER_HANDLE_DATA perHandleData,LPPER_IO_DATA perIodata);
bool PostSend(LPPER_HANDLE_DATA perHandleData,LPPER_IO_DATA perIodata);
bool PostAccept(LPPER_HANDLE_DATA perHandleData,LPPER_IO_DATA perIodata);
VOID CALLBACK OverlappedCompletionRoutine(
PTP_CALLBACK_INSTANCE pInstance,
PVOID pvContect,
PVOID pOverlapped,
ULONG IoResult,
ULONG_PTR NumberOfBytesTransferred,
PTP_IO pIo)
{
LPOVERLAPPED Overlapped=(LPOVERLAPPED)pOverlapped;
LPPER_HANDLE_DATA PerHandleData=(LPPER_HANDLE_DATA)pvContect;
LPPER_IO_DATA PerIoData=NULL;
//DWORD SendBytes,RecvBytes;
DWORD Flags=0;
if( Overlapped != NULL ) PerIoData=CONTAINING_RECORD(Overlapped,PER_IO_DATA,Overlapped);
if( IoResult != NO_ERROR )
{
if( Overlapped == NULL )
{
int error_code = WSAGetLastError();
return; // The function does not store information in the variables pointed to by
// the lpNumberOfBytesTransferred and lpCompletionKey parameters.
}
else
{
DWORD dwTransfed,dwFlags;
SOCKET thisSock=PerHandleData->Socket;
BOOL ErrorResult=WSAGetOverlappedResult(thisSock, Overlapped,&dwTransfed,true,&dwFlags);
int error_code = WSAGetLastError();
if( WSAECONNRESET == error_code)
{
EnterCriticalSection( &m_csLock );
auto it=client_conn.find(PerHandleData);
if( it != client_conn.end() )
{
if( PerHandleData->per_io_buffer->WriteData(PerIoData->pack_serial,
DisConnPackInstance::GetLpInstance()->get_package_stream(),
DisConnPackInstance::GetLpInstance()->get_length()) )
{
client_conn.erase(it);
client_queue.AddClient(PerHandleData);
}
LeaveCriticalSection( &m_csLock );
FreePerIoData( PerIoData );
}
else
{
LeaveCriticalSection( &m_csLock );
FreePerIoData( PerIoData );
}
}
return; //The function stores information in the variables pointed to by
//lpNumberOfBytesTransferred, lpCompletionKey, and lpOverlapped.
}
}
if( NumberOfBytesTransferred==0 &&
( PerIoData->OperationType == RECV_POSTED ||
PerIoData->OperationType == SEND_POSTED ) )
{
EnterCriticalSection( &m_csLock );
auto it=client_conn.find(PerHandleData);
if( it != client_conn.end() )
{
if( PerHandleData->per_io_buffer->WriteData(PerIoData->pack_serial,
DisConnPackInstance::GetLpInstance()->get_package_stream(),
DisConnPackInstance::GetLpInstance()->get_length()) )
{
client_conn.erase(it);
client_queue.AddClient(PerHandleData);
}
LeaveCriticalSection( &m_csLock );
FreePerIoData( PerIoData );
}
else
{
LeaveCriticalSection( &m_csLock );
FreePerIoData( PerIoData );
}
return;
}
if( PerIoData->OperationType == RECV_POSTED )
{
//EnterCriticalSection( &m_csLock );
//post_fail_time=100;
//cout <<"write ";
bool can_alive=PerHandleData->per_io_buffer->WriteData(PerIoData->pack_serial,PerIoData->Buffer,NumberOfBytesTransferred);
//cout <<"done" <<endl;
//post_fail_time=200;
if( /*PerHandleData->bFree >= OUTQUEUE_SIGNATURE &&*/ can_alive )
{
//cout <<"come\n";
client_queue.AddClient(PerHandleData);
}
//cout <<PerIoData->pack_serial <<endl; // this prove that this program's IOBuffer write operation has a problem
//post_fail_time=0;
//LeaveCriticalSection( &m_csLock );
if( ! PostRecv(PerHandleData,PerIoData) ) //post 序号要加锁!!
return;
}
else if( PerIoData->OperationType == SEND_POSTED )
{
if( NumberOfBytesTransferred < PerIoData->DataBuf.len )
{
PerIoData->DataBuf.buf += NumberOfBytesTransferred;
PerIoData->DataBuf.len -= NumberOfBytesTransferred;
if( ! PostSend(PerHandleData, PerIoData) )
{
return;
}
}
else
{
PerHandleData->send_ctrl_block.bIOSendPendding=FALSE;
Package* pack=(Package*)PerIoData->Buffer;
delete pack;
FreePackPerIoData( PerIoData );
//cout <<"post send" <<endl; //a send has complete,send next
}
}
else if(PerIoData->OperationType == ACCEPT_POSTED )
{
cout <<"connected: " <<PerHandleData->Socket <<endl;
LPPER_HANDLE_DATA perHandleData=new PER_HANDLE_DATA();
client_conn.insert(perHandleData);
perHandleData->pack=new Package();
perHandleData->bFree=OUTQUEUE_SIGNATURE;
perHandleData->alive_cnt=0;
perHandleData->per_io_buffer=new PerIOBuffer();
perHandleData->pack_serial=0;
perHandleData->Socket=PerHandleData->Socket;
perHandleData->CompletionPort=PerHandleData->CompletionPort;
InitializeCriticalSection( &perHandleData->SockCritSec );
PTP_IO pio=CreateThreadpoolIo( (HANDLE)perHandleData->Socket, OverlappedCompletionRoutine, (PVOID)perHandleData, NULL);
perHandleData->pIo=pio;
CreateIoCompletionPort((HANDLE)perHandleData->Socket,perHandleData->CompletionPort,(DWORD)0,0);
LPPER_IO_DATA perIodata;
//perIodata=GetPerIoData(); //should get from object pool
//perIodata->DataBuf.buf=perIodata->Buffer;
//perIodata->DataBuf.len=5; //if send 0 bytes will close socket,see handle recv&send event
//if( ! PostSend(perHandleData,perIodata) )
//{
//}
for(int i=0;i<RECV_POST_PER_SOCK;i++)
{
perIodata=GetPerIoData(); //should get from object pool
perIodata->DataBuf.buf=perIodata->Buffer;
perIodata->DataBuf.len=MAX_BUF_SIZE;
if( ! PostRecv(perHandleData,perIodata) )
{
}
}
if( ! PostAccept(PerHandleData,PerIoData) )
{
return;
}
}
}
bool PostSend(LPPER_HANDLE_DATA perHandleData,LPPER_IO_DATA perIodata)
{
DWORD flags=0,bytes_sended;
ZeroMemory( & (perIodata->Overlapped),sizeof(OVERLAPPED) );
perIodata->OperationType=SEND_POSTED;
EnterCriticalSection( &perHandleData->SockCritSec );
StartThreadpoolIo(perHandleData->pIo);
if( WSASend(perHandleData->Socket,& (perIodata->DataBuf),1,&bytes_sended,flags,& (perIodata->Overlapped),NULL) )
{
if( WSAGetLastError() != WSA_IO_PENDING )
{
CancelThreadpoolIo(perHandleData->pIo);
LeaveCriticalSection( &perHandleData->SockCritSec );
return false;
}
}
LeaveCriticalSection( &perHandleData->SockCritSec );
return true;
}
bool PostRecv(LPPER_HANDLE_DATA perHandleData,LPPER_IO_DATA perIodata)
{
DWORD Flags=0,RecvBytes;
ZeroMemory( & (perIodata->Overlapped),sizeof(OVERLAPPED) );
//ZeroMemory(perIodata->Buffer,MAX_BUF_SIZE);
perIodata->OperationType=RECV_POSTED;
EnterCriticalSection( &perHandleData->SockCritSec );
StartThreadpoolIo(perHandleData->pIo);
perHandleData->pack_serial++;
perIodata->pack_serial=perHandleData->pack_serial;
if(WSARecv(perHandleData->Socket, & (perIodata->DataBuf), 1, &RecvBytes,
&Flags, & (perIodata->Overlapped),NULL))
{
int error_code=WSAGetLastError();
if( error_code != WSA_IO_PENDING )
{
CancelThreadpoolIo(perHandleData->pIo);
LeaveCriticalSection( &perHandleData->SockCritSec );
return false;
}
}
LeaveCriticalSection( &perHa
IOCP服务器收发、处理数据包的简单框架
5星 · 超过95%的资源 需积分: 44 76 浏览量
2014-02-23
19:28:17
上传
评论 6
收藏 116KB RAR 举报
mymodian
- 粉丝: 17
- 资源: 10
最新资源
- 前端vue框架,后端ssm+springboot框架,网站开发.zip
- 基于springboot的快速开发框架.zip
- 基于springboot的分布式权限管理系统,易读易懂、界面简洁美观.zip
- 基于SpringBoot Mybatis-Plus TypeScript的微服务多租户SaaS管理快速开发框架 .zip
- 论文复现:QA-GNN: Reasoning with Language Models and Knowledge
- ipp(intel-oneAPI)下载地址.txt
- 基于spring-boot dubbox搭建的java分布式系统的前端管理.zip
- VLC+Qt demoVLC+Qt demo
- 海彪&龙梅子 - 寂寞的人伤心的歌 (DJ版) [mqms2].ogg
- 530springboot + vue 旅游管理系统.zip(可运行源码+数据库文件+文档)
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
- 1
- 2
- 3
前往页