#include "stdafx.h"
#include "SocketServer.h"
#include "IOCompletionPort.h"
#include "Win32Exception.h"
#include "Utils.h"
#include "SystemInfo.h"
#include "Socket.h"
#include <vector>
#pragma comment(lib, "ws2_32.lib")
using std::vector;
namespace NvNetwork{
///////////////////////////////////////////////////////////////////////////////
// Static helper methods
///////////////////////////////////////////////////////////////////////////////
static size_t CalculateNumberOfThreads(
size_t numThreads);
///////////////////////////////////////////////////////////////////////////////
// CSocketServer
///////////////////////////////////////////////////////////////////////////////
CSocketServer::CSocketServer(
unsigned long addressToListenOn,
unsigned short portToListenOn,
size_t maxFreeSockets,
size_t maxFreeBuffers,
size_t bufferSize /* = 1024 */,
size_t numThreads /* = 0 */,
bool useSequenceNumbers /* = true */,
bool postZeroByteReads /* = false */)
: CIOBuffer::Allocator(bufferSize, maxFreeBuffers),
m_numThreads(CalculateNumberOfThreads(numThreads)),
m_listeningSocket(INVALID_SOCKET),
m_iocp(0),
m_address(addressToListenOn),
m_port(portToListenOn),
m_maxFreeSockets(maxFreeSockets),
m_useSequenceNumbers(useSequenceNumbers),
m_postZeroByteReads(postZeroByteReads)
{
}
CSocketServer::~CSocketServer()
{
}
void CSocketServer::ReleaseSockets()
{
CCriticalSection::Owner lock(m_listManipulationSection);
while (m_activeList.Head())
{
ReleaseSocket(m_activeList.Head());
}
while (m_freeList.Head())
{
DestroySocket(m_freeList.PopNode());
}
if (m_freeList.Count() + m_freeList.Count() != 0)
{
//lint -e{1933} call to unqualified virtual function
OnError(_T("CSocketServer::ReleaseSockets() - Leaked sockets"));
}
}
void CSocketServer::StartAcceptingConnections()
{
if (m_listeningSocket == INVALID_SOCKET)
{
//lint -e{1933} call to unqualified virtual function
OnStartAcceptingConnections();
//lint -e{1933} call to unqualified virtual function
m_listeningSocket = CreateListeningSocket(m_address, m_port);
m_acceptConnectionsEvent.Set();
}
}
void CSocketServer::StopAcceptingConnections()
{
if (m_listeningSocket != INVALID_SOCKET)
{
m_acceptConnectionsEvent.Reset();
if (0 != ::closesocket(m_listeningSocket))
{
//lint -e{1933} call to unqualified virtual function
OnError(_T("CSocketServer::StopAcceptingConnections() - closesocket - ") + GetLastErrorMessage(::WSAGetLastError()));
}
m_listeningSocket = INVALID_SOCKET;
//lint -e{1933} call to unqualified virtual function
OnStopAcceptingConnections();
}
}
void CSocketServer::InitiateShutdown()
{
// signal that the dispatch thread should shut down all worker threads and then exit
StopAcceptingConnections();
{
CCriticalSection::Owner lock(m_listManipulationSection);
Socket *pSocket = m_activeList.Head();
while (pSocket)
{
Socket *pNext = SocketList::Next(pSocket);
pSocket->AbortiveClose();
pSocket = pNext;
}
}
m_shutdownEvent.Set();
//lint -e{1933} call to unqualified virtual function
OnShutdownInitiated();
}
void CSocketServer::WaitForShutdownToComplete()
{
// if we havent already started a shut down, do so...
InitiateShutdown();
Wait();
ReleaseSockets();
Flush();
}
SOCKET CSocketServer::CreateListeningSocket(
unsigned long address,
unsigned short port)
{
SOCKET s = ::WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (s == INVALID_SOCKET)
{
throw CWin32Exception(_T("CSocket::CreateListeningSocket()"), ::WSAGetLastError());
}
CSocket listeningSocket(s);
CSocket::InternetAddress localAddress(address, port);
listeningSocket.Bind(localAddress);
listeningSocket.Listen(5);
return listeningSocket.Detatch();
}
CSocketServer::WorkerThread *CSocketServer::CreateWorkerThread(
CIOCompletionPort &iocp)
{
return new WorkerThread(*this, iocp);
}
int CSocketServer::Run()
{
try
{
vector<WorkerThread *> workers;
workers.reserve(m_numThreads);
for (size_t i = 0; i < m_numThreads; ++i)
{
//lint -e{1933} call to unqualified virtual function
WorkerThread *pThread = CreateWorkerThread(m_iocp);
workers.push_back(pThread);
pThread->Start();
//lint -e{429} custodial pointer neither freed nor returned
}
HANDLE handlesToWaitFor[2];
handlesToWaitFor[0] = m_shutdownEvent.GetEvent();
handlesToWaitFor[1] = m_acceptConnectionsEvent.GetEvent();
while (!m_shutdownEvent.Wait(0))
{
DWORD waitResult = ::WaitForMultipleObjects(2, handlesToWaitFor, false, INFINITE);
if (waitResult == WAIT_OBJECT_0)
{
// Time to shutdown
break;
}
else if (waitResult == WAIT_OBJECT_0 + 1)
{
// accept connections
while (!m_shutdownEvent.Wait(0) && m_acceptConnectionsEvent.Wait(0))
{
CIOBuffer *pAddress = Allocate();
int addressSize = (int)pAddress->GetSize();
//lint -e{826} suspicious pointer conversion
SOCKET acceptedSocket = ::WSAAccept(
m_listeningSocket,
reinterpret_cast<sockaddr*>(const_cast<BYTE*>(pAddress->GetBuffer())),
&addressSize,
0,
0);
pAddress->Use(addressSize);
if (acceptedSocket != INVALID_SOCKET)
{
Socket *pSocket = AllocateSocket(acceptedSocket);
//lint -e{1933} call to unqualified virtual function
OnConnectionEstablished(pSocket, pAddress);
pSocket->Release();
}
else if (m_acceptConnectionsEvent.Wait(0))
{
//lint -e{1933} call to unqualified virtual function
OnError(_T("CSocketServer::Run() - WSAAccept:") + GetLastErrorMessage(::WSAGetLastError()));
}
pAddress->Release();
}
}
else
{
//lint -e{1933} call to unqualified virtual function
OnError(_T("CSocketServer::Run() - WaitForMultipleObjects: ") + GetLastErrorMessage(::GetLastError()));
}
}
for (int i = 0; i < m_numThreads; ++i)
{
workers[i]->InitiateShutdown();
}
for (int i = 0; i < m_numThreads; ++i)
{
workers[i]->WaitForShutdownToComplete();
delete workers[i];
workers[i] = 0;
}
}
catch(const CException &e)
{
//lint -e{1933} call to unqualified virtual function
OnError(_T("CSocketServer::Run() - Exception: ") + e.GetWhere() + _T(" - ") + e.GetMessage());
}
catch(...)
{
//lint -e{1933} call to unqualified virtual function
OnError(_T("CSocketServer::Run() - Unexpected exception"));
}
//lint -e{1933} call to unqualified virtual function
OnShutdownComplete();
return 0;
}
CSocketServer::Socket *CSocketServer::AllocateSocket(
SOCKET theSocket)
{
CCriticalSection::Owner lock(m_listManipulationSection);
Socket *pSocket = 0;
if (!m_freeList.Empty())
{
pSocket = m_freeList.PopNode();
pSocket->Attach(theSocket);
pSocket->AddRef();
}
else
{
pSocket = new Socket(*this, theSocket, m_useSe
没有合适的资源?快使用搜索试试~ 我知道了~
socket wrapper
共27个文件
h:15个
cpp:12个
需积分: 10 8 下载量 105 浏览量
2008-05-25
18:16:09
上传
评论
收藏 24KB RAR 举报
温馨提示
对winsock的封装
资源推荐
资源详情
资源评论
收起资源包目录
tanwabin.rar (27个子文件)
tanwabin
IOBuffer.h 4KB
Event.h 694B
ManualResetEvent.h 447B
CriticalSection.h 683B
NodeList.h 1KB
Utils.h 2KB
SocketServer.cpp 34KB
Utils.cpp 7KB
Win32Exception.h 289B
Event.cpp 3KB
IOCompletionPort.cpp 2KB
IOBuffer.cpp 9KB
Exception.h 552B
Thread.h 512B
CriticalSection.cpp 970B
SocketServer.h 12KB
Socket.cpp 3KB
Socket.h 2KB
tstring.h 179B
Exception.cpp 515B
Win32Exception.cpp 326B
OpaqueUserData.h 1KB
IOCompletionPort.h 875B
Thread.cpp 2KB
NodeList.cpp 2KB
SystemInfo.h 186B
ManualResetEvent.cpp 372B
共 27 条
- 1
资源评论
downloadsomething
- 粉丝: 0
- 资源: 3
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功