// CCTPReceivedData - Buffer for storing received data and information about it
// CCTPErrorInfo - Buffer for storing error information
// CCTPNet - Class, which implements CTP
// Implementation file
//
// (c) Lev Naumov, CAMEL Laboratory
// E-mail: camellab@mail.ru
// For more information see http://camel.ifmo.ru or
// http://www.codeproject.com/internet/ctp.asp
/////////////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "NetBasic.h"
#include "DebugLog.h"
#include "CTPNet.h"
// Macrodefinitions for handy log building
// Put message mess to output stream log, protecter by critical section cs
#define LOG(log,cs,mess) \
if (log) { \
char ts[22]; \
CSingleLock lock(&cs,TRUE); \
(((ostream&)*log)<<CCTPErrorInfo::GetTimeStamp(ts)<<" "<<mess<<"\n").flush();\
}
// The same, but string representation of ip-address ip can be referenced as
// "addr"
#define LOGA(log,cs,mess,ip) \
if (log) { \
char ts[22],saddr[16]; \
ip.GetString(saddr); \
CSingleLock lock(&cs,TRUE); \
(((ostream&)*log)<<CCTPErrorInfo::GetTimeStamp(ts)<<" "<<mess<<"\n").flush();\
}
// The same, but inserts description of header head between mess1 and mess2.
// Moreover, string representation of ip-address ip can be referenced as "addr"
#define LOGHA(log,cs,mess1,mess2,head,ip) \
if (log) { \
char ts[22],saddr[16]; \
ip.GetString(saddr); \
CSingleLock lock(&cs,TRUE); \
((ostream&)*log)<<CCTPErrorInfo::GetTimeStamp(ts)<<" "<<mess1;\
head->ToStream(*log); \
(((ostream&)*(log))<<mess2<<"\n").flush();\
}
CCTPReceivedData::CCTPReceivedData(unsigned __int16 command, unsigned __int64 size, unsigned long from, char* buf)
{
this->command=command;
this->size=size;
this->from=IPAddr(from);
pBuf=new char[(unsigned int)size];
if (buf) memcpy(pBuf,buf,(unsigned int)size);
}
CCTPErrorInfo::CCTPErrorInfo(unsigned char type,unsigned __int16 command,int code,IPAddr addr)
{
this->type=type;
this->command=command;
this->code=code;
this->addr=addr;
GetTimeStamp(timestamp);
}
char* CCTPErrorInfo::GetTimeStamp(char* s)
{
CHAR date[30]="";
CHAR time[30]="";
_timeb timebuffer;
// Get date/time
_strdate(date);
_ftime(&timebuffer);
_strtime(time);
// Create string
sprintf(s,"%8s %8s.%03d",date,time,timebuffer.millitm);
return s;
}
void CCTPNet::Header::ToStream(ostream& out)
{
if (command&CCTPNet::m_iConfirm) {
out<<"{confirm: "<<(command^CCTPNet::m_iConfirm);
} else {
out<<"{command: "<<command;
}
out<<", id: "<<(unsigned int)id<<", size: "<<(unsigned int)size;
if (amount>1) {
out<<", num: "<<(unsigned int)number<<"("<<(unsigned int)amount<<")";
}
if (options) {
out<<", opt: "<<options;
}
out<<"}";
}
bool CCTPNet::SntCommandInfo::Confirm(unsigned int i)
{
CI[i].bConfirmed=true;
for (unsigned int j=0; j<uCount; j++) {
if (!CI[j].bConfirmed) return false;
}
return true;
}
bool CCTPNet::LargeCommandInfo::GotPart(unsigned int i)
{
received[i]=true;
for (unsigned int j=0; j<uCount; j++) {
if (!received[j]) return false;
}
return true;
}
const unsigned __int8 CCTPNet::OptPing=CCTPNet::Options::DelAfterError|CCTPNet::Options::NoResend|CCTPNet::Options::UniqueCommand;
const unsigned __int16 CCTPNet::m_iConfirm=0x8000;
CCTPNet::CCTPNet(NetReceiver* receiver,unsigned short port,unsigned short servers,Times* times,ostream* log,unsigned __int16 packetdatasize,unsigned short maxdeliverers)
{
// Tuning
m_DefReceiver=receiver;
m_uPort=port;
if (times) m_Times=*times;
m_uPacketDataSize=packetdatasize;
m_pLog=log;
// Initialize random generator
srand((unsigned)time(NULL));
// Initialization
m_bSuspended=true;
m_SntCommands.clear();
m_Sessions.clear();
m_LargeCommands.clear();
m_Receivers.clear();
m_pBuffer=new char[m_uPacketDataSize+GetHeaderSize()];
m_Deliveries.clear();
m_pDeliverTrds.clear();
m_uMaxDeliverers=maxdeliverers;
m_uBusy=0;
CreateSockets();
m_bKill=false;
// Start threads and store handles
for (unsigned int i=0;i<servers;i++) {
m_pServerTrds.push_back(AfxBeginThread(CTPServerFunction,this));
}
m_pDelManTrd=AfxBeginThread(CTPDelManFunction,this);
LOG(m_pLog,m_csLog,"CTP started on port "<<port<<" with "<<servers<<" servers\n");
}
CCTPNet::~CCTPNet()
{
LOG(m_pLog,m_csLog,"CTP is shuting down");
// Terminate threads
m_bKill=true;
DWORD time=GetTickCount();
while ((!m_pDeliverTrds.empty()) || !m_pServerTrds.empty() || m_pDelManTrd) {
Sleep(m_Times.uSleepOnDestroy);
// Kill servers, deliverers and delivery manager if they are busy too long
if (GetTickCount()-time>m_Times.uPeriodDestroy) {
CSingleLock locks(&m_csServerTrds);
LOCK(locks);
for (vector<CWinThread*>::iterator it=m_pServerTrds.begin(); it!=m_pServerTrds.end(); it++) {
LOG(m_pLog,m_csLog,"Server thread with handle "<<(*it)->m_hThread<<" was stopped forcedly");
TerminateThread((*it)->m_hThread,0);
}
m_pServerTrds.clear();
UNLOCK(locks);
CSingleLock lockd(&m_csDeliverTrds);
LOCK(lockd);
for (it=m_pDeliverTrds.begin(); it!=m_pDeliverTrds.end(); it++) {
LOG(m_pLog,m_csLog,"Deliverer thread with handle "<<(*it)->m_hThread<<" was stopped forcedly");
TerminateThread((*it)->m_hThread,0);
}
m_pDeliverTrds.clear();
UNLOCK(lockd);
if (m_pDelManTrd) {
LOG(m_pLog,m_csLog,"Delivery manager thread was stopped forcedly");
TerminateThread(m_pDelManTrd->m_hThread,0);
}
}
}
// Free resources
closesocket(m_SendSocket);
closesocket(m_RecvSocket);
FreeSntCommands();
FreeSessions();
FreeLargeCommands();
FreeDeliveries();
m_Receivers.clear();
delete[] m_pBuffer;
LOG(m_pLog,m_csLog,"CTP stopped");
}
bool CCTPNet::CreateSockets()
{
LOG(m_pLog,m_csLog,"Creating sockets");
CSingleLock lock(&m_csDeliveries);
// Sockets creation
m_SendSocket=socket(AF_INET, SOCK_DGRAM, 0);
m_RecvSocket=socket(AF_INET, SOCK_DGRAM, 0);
if (m_SendSocket==INVALID_SOCKET || m_RecvSocket==INVALID_SOCKET) {
LOCK(lock);
m_Deliveries.push_back(Delivery(m_DefReceiver,new CCTPErrorInfo(0,0,WSAGetLastError(),IPAddr())));
UNLOCK(lock);
LOG(m_pLog,m_csLog,"Failed to create sockets");
m_bSuspended=true;
return false;
}
// Sockets are to support broadcasting
BOOL broadcast=TRUE;
setsockopt(m_SendSocket,SOL_SOCKET,SO_BROADCAST,(char*)&broadcast,sizeof(broadcast));
setsockopt(m_RecvSocket,SOL_SOCKET,SO_BROADCAST,(char*)&broadcast,sizeof(broadcast));
// Binding local address with receiving socket
m_Local.sin_family=AF_INET;
m_Local.sin_port=htons(m_uPort);
m_Local.sin_addr.s_addr=htonl(INADDR_ANY);
if (bind(m_RecvSocket,(SOCKADDR*)&m_Local,sizeof(m_Local))==SOCKET_ERROR)
{
LOCK(lock);
m_Deliveries.push_back(Delivery(m_DefReceiver,new CCTPErrorInfo(1,0,WSAGetLastError(),IPAddr())));
UNLOCK(lock);
LOG(m_pLog,m_csLog,"Failed to bind receiving socket");
m_bSuspended=true;
return false;
}
return true;
}
void CCTPNet::CheckupOptions(Header& header)
{
if (header.amount>1 && header.options&Options::UniqueCommand) header.options^=Options::UniqueCommand;
if
CTP.zip_UDP可靠_ctpudp_ctrl del_udp可靠传输_传输 udp
版权申诉
131 浏览量
2022-09-19
13:02:40
上传
评论
收藏 15KB ZIP 举报
邓凌佳
- 粉丝: 65
- 资源: 1万+
最新资源
- 老飞飞搭建基础通用数据库V19数据库.rar
- jquery.js
- 机械设计多工位ACF贴胶带&预压设备sw18可编辑非常好的设计图纸100%好用.zip
- 基于Pytorch复现Point-Transformer,用于ShapeNet数据集点云分割
- 【医学影像分析】2D超声图像的分割检测(Ultrasound Nerve Segmentation - Kaggle数据集)
- 嘎嘎香的五款神仙谷歌插件
- .arch书源导入教程.mp4
- 贪心算法介绍及代码示例讲解
- CR13SP35MSI64 Crystal 水晶报表运行组件最后版本64位
- 图像分类数据集:玉米叶是否感染分类数据集(2分类,包含训练集、验证集)
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈