#include "stdio.h"
#include "conio.h"
#include "windows.h"
#include "xpnetdef.h"
#include "xpmsg.h"
#include "xpstream.h"
#include "string.h"
#include <winsock.h>
#include <process.h>
#include <map>
#include <iostream>
using namespace std;
class StreamServer
{
public:
StreamServer()
{
InitializeCriticalSection(&_cs);
XpsStreamReader reader = {(long)this,
&StreamServer::OpenStream, &StreamServer::CloseStream};
_server = XpsServerCreate(&reader, 4);
_id = 0;
}
~StreamServer()
{
DeleteCriticalSection(&_cs);
XpsServerDestroy(_server);
}
int Start()
{
return XpsServerStart(_server, 8000);
}
void Stop()
{
XpsServerStop(_server);
}
void WriteStream(int streamId, unsigned streamType,
const void* buffer, unsigned length)
{
XpsServerWriteStream(_server, streamId, streamType, buffer, length);
}
void CloseAllReader(){}
private:
struct ReadInfo
{
StreamServer* p;
volatile bool closed;
int id;
HANDLE h;
ReadInfo(StreamServer* _p, int _id)
: p(_p), id(_id), closed(false){}
};
static int XPAPI OpenStream(long userData, const char* url)
{
StreamServer* pThis = (StreamServer*)userData;
ReadInfo* info = new ReadInfo(pThis, pThis->_id++);
if (pThis->_id = -1)pThis->_id++;
HANDLE h = (HANDLE)_beginthreadex(NULL, 0,
&StreamServer::ReadThread, (void*)info, 0, 0);
info->h = h;
EnterCriticalSection(&pThis->_cs);
pThis->_readers[info->id] = info;
LeaveCriticalSection(&pThis->_cs);
return info->id;
}
static void XPAPI CloseStream(long userData, int id)
{
StreamServer* pThis = (StreamServer*)userData;
EnterCriticalSection(&pThis->_cs);
map<int, ReadInfo*>::iterator iter =pThis-> _readers.find(id);
if (iter != pThis->_readers.end())
{
iter->second->closed = true;
::WaitForSingleObject(iter->second->h, INFINITE);
::CloseHandle(iter->second->h);
delete iter->second;
pThis->_readers.erase(iter);
}
LeaveCriticalSection(&pThis->_cs);
}
static unsigned XPAPI ReadThread(void* userData)
{
ReadInfo* info = (ReadInfo*)userData;
char buff[1024];
char val = 0;
int type = 1; // 1: decode init data
while (!info->closed)
{
memset(buff, val++, sizeof(buff));
info->p->WriteStream(info->id, type, buff, sizeof(buff));
type = 0;
Sleep(100);
}
return 0;
}
private:
XpsServer* _server;
map<int, ReadInfo*> _readers;
CRITICAL_SECTION _cs;
int _id;
};
void test_client();
int main(int argc, char** argv)
{
if (argc > 1 && strcmp(argv[1], "-c") == 0)
{
test_client();
}
else
{
StreamServer server;
int ret = server.Start();
cin.get();
server.Stop();
}
return 0;
}
class Client
{
public:
Client()
{
InitializeCriticalSection(&_cs);
XpmClientEvent clientEvent = {(long)this, &OnConnect, &OnConnectionClose, &OnRead};
_client = XpmClientCreate(&clientEvent, 2);
}
~Client()
{
XpmClientDestroy(_client);
}
int SyncConnect(const char* ip, unsigned short port, unsigned timeout)
{
int id = XpmClientAsyncConnect(_client, ip, port, timeout, 0);
ConnInfo* info = new ConnInfo;
info->id = id;
info->result = -1;
info->event = ::CreateEvent(NULL, TRUE, FALSE, NULL);
EnterCriticalSection(&_cs);
_connInfos[id]= info;
LeaveCriticalSection(&_cs);
::WaitForSingleObject(info->event, timeout);
EnterCriticalSection(&_cs);
if (info->result != 0)
id = -1; // connect failed
_connInfos.erase(id);
LeaveCriticalSection(&_cs);
return id;
}
void SendData(int id, const void* buffer, unsigned length)
{
XpmClientAsyncWrite(_client, id, buffer, length, 0);
}
void CloseConnection(int id)
{
}
private:
static void XPAPI OnConnect(long userData, int id, int result)
{
Client* pThis = (Client*)userData;
EnterCriticalSection(&pThis->_cs);
map<int, ConnInfo*>::iterator iter = pThis->_connInfos.find(id);
if (iter != pThis->_connInfos.end())
{
if (iter->second->id == id)
{
iter->second->result = result;
::SetEvent(iter->second->event);
}
}
LeaveCriticalSection(&pThis->_cs);
}
static void XPAPI OnConnectionClose(long userData, int id)
{
Client* pThis = (Client*)userData;
EnterCriticalSection(&pThis->_cs);
printf("OnConnectionClose: id: %d\n", id);
LeaveCriticalSection(&pThis->_cs);
}
static void XPAPI OnRead(long userData, int id, const void* buffer, unsigned length)
{
Client* pThis = (Client*)userData;
EnterCriticalSection(&pThis->_cs);
XpMsgHeader* head = (XpMsgHeader*)buffer;
printf("ClientOnRead: id: %d, len: %d, seq: %d\n", id, length, ntohl(head->sequence));
LeaveCriticalSection(&pThis->_cs);
}
private:
struct ConnInfo
{
int id;
HANDLE event;
int result;
};
CRITICAL_SECTION _cs;
XpmClient* _client;
map<int, ConnInfo*> _connInfos;
};
Client client;
void test_client()
{
int id = client.SyncConnect("127.0.0.1", 8000, 3000);
int count = 5;
unsigned sequence = 0;
struct TlvAttribute
{
unsigned short type;
unsigned short length;
char value[1];
};
char buff[1024];
char* url = "http://127.0.0.1/channel";
TlvAttribute* tlv = (TlvAttribute*)(buff + sizeof(XpMsgHeader));
tlv->type = htons(1);
tlv->length = htons(4+strlen(url));
strcpy(tlv->value, url);
XpMsgHeader* head = (XpMsgHeader*)buff;
head->identifer = XP_MSG_ID_REQUEST;
head->version = XP_MSG_VERSION;
head->headLen = sizeof(XpMsgHeader);
head->option = XP_MSG_OPT_REPORT_TIMEOUT; // 发送超时会收到超时的响应包
head->packetLen = ntohl(sizeof(XpMsgHeader) + 4 + strlen(url));
head->sequence = 0;
head->command = ntohl(XP_STREAM_CMD_REQUEST);
client.SendData(id, buff, sizeof(XpMsgHeader) + 4 + strlen(url));
cin.get();
}
- 1
- 2
前往页