#include "ServerSessionManager.h"
void on_message(server* s, websocketpp::connection_hdl hdl, message_ptr msg) {
//std::cout << "on_message called with hdl: " << hdl.lock().get()
// << " and message: " << msg->get_payload()
// << std::endl;
ASYNC_MSG* pAsyncMsg = new(std::nothrow)ASYNC_MSG;
if (pAsyncMsg)
{
pAsyncMsg->s = s;
pAsyncMsg->hdl = hdl;
pAsyncMsg->msg = msg->get_payload();
pAsyncMsg->opcodeValue = msg->get_opcode();
CServerSessionMgr::GetInstance()->PushQueueMsg(pAsyncMsg);
}
// check for a special command to instruct the server to stop listening so
// it can be cleanly exited.
//if (msg->get_payload() == "stop-listening") {
// s->stop_listening();
// return;
//}
//try {
// s->send(hdl, msg->get_payload(), msg->get_opcode());
//}
//catch (websocketpp::exception const & e) {
// std::cout << "Echo failed because: "
// << "(" << e.what() << ")" << std::endl;
//}
}
void on_close(websocketpp::connection_hdl hdl)
{
std::cout << "Close handler" << std::endl;
}
void on_open(websocketpp::connection_hdl hdl)
{
server::connection_ptr con = CServerSessionMgr::GetInstance()->GetWebsocketServer()->get_con_from_hdl(hdl);
std::cout << "Open handler" << hdl.lock().get() << std::endl;
//std::cout << "Host: " << con->get_host() << std::endl;
//std::cout << "Port: " << con->get_port() << std::endl;
//std::cout << con->get_remote_endpoint() << std::endl;
}
void on_http(server* s, websocketpp::connection_hdl hdl) {
server::connection_ptr con = s->get_con_from_hdl(hdl);
std::string res = con->get_request_body();
std::stringstream ss;
ss << "got HTTP request with " << res.size() << " bytes of body data.";
con->set_body(ss.str());
con->set_status(websocketpp::http::status_code::ok);
}
void on_fail(server* s, websocketpp::connection_hdl hdl) {
server::connection_ptr con = s->get_con_from_hdl(hdl);
std::cout << "Fail handler: " << con->get_ec() << " " << con->get_ec().message() << std::endl;
}
CServerSessionMgr::CServerSessionMgr()
:m_bRunning(FALSE)
,m_bInit(FALSE)
{
m_listMsgQueue.clear();
}
CServerSessionMgr::~CServerSessionMgr()
{
}
INT32 CServerSessionMgr::Init(UINT16 nTcpPort,INT32 iWorkThreadCount /* = 0 */)
{
INT32 iErrorCode = -1;
do
{
m_bRunning = TRUE;
m_bInit = TRUE;
//创建异步工作处理线程池,总共创建64个
int iWorkThreadCount = iWorkThreadCount == 0 ? 32 : iWorkThreadCount;
for (INT32 i = 0;i<iWorkThreadCount;i++)
{
m_vecThreadProcess.push_back(thread(std::bind(&CServerSessionMgr::threadProcess, this)));
}
try {
// Set logging settings
m_websocketServer.set_access_channels(websocketpp::log::alevel::all);
m_websocketServer.clear_access_channels(websocketpp::log::alevel::frame_payload);
// Initialize Asio
m_websocketServer.init_asio();
// Register our message handler
m_websocketServer.set_message_handler(bind(&on_message, &m_websocketServer, ::_1, ::_2));
m_websocketServer.set_close_handler(&on_close);
m_websocketServer.set_open_handler(&on_open);
m_websocketServer.set_http_handler(bind(&on_http, &m_websocketServer, ::_1));
m_websocketServer.set_fail_handler(bind(&on_fail, &m_websocketServer, ::_1));
// Listen on port 9002
m_websocketServer.listen(nTcpPort);
// Start the server accept loop
m_websocketServer.start_accept();
// Start the ASIO io_service run loop
m_websocketServer.run();
}
catch (websocketpp::exception const & e) {
std::cout << e.what() << std::endl;
}
catch (...) {
std::cout << "other exception" << std::endl;
}
return 0;
} while (0);
Fini();
return iErrorCode;
}
INT32 CServerSessionMgr::Fini()
{
m_bRunning = FALSE;
// 停止全部扫码线程
for (uint32_t i = 0; i < m_vecThreadProcess.size(); ++i)
{
if (m_vecThreadProcess[i].joinable())
{
m_vecThreadProcess[i].join();
}
}
return 0;
}
INT32 CServerSessionMgr::PushQueueMsg(ASYNC_MSG* msg)
{
lock_guard<mutex> lock(m_msgMutex);
m_listMsgQueue.push_back(msg);
return 0;
}
ASYNC_MSG* CServerSessionMgr::PopQueueMsg()
{
ASYNC_MSG* pAsyncMsg = NULL;
lock_guard<mutex> lock(m_msgMutex);
if (m_listMsgQueue.size()>0)
{
pAsyncMsg = m_listMsgQueue.front();
m_listMsgQueue.pop_front();
}
return pAsyncMsg;
}
void CServerSessionMgr::threadProcess()
{
printf("\n------enter proces thread------\n");
while (m_bRunning==TRUE)
{
ASYNC_MSG* pAsyncMsg= PopQueueMsg();
if (pAsyncMsg)
{
string rsp = "";
ProcessMsg(pAsyncMsg->msg, rsp);
try {
pAsyncMsg->s->send(pAsyncMsg->hdl, rsp, pAsyncMsg->opcodeValue);
}
catch (websocketpp::exception const & e) {
std::cout << "Echo failed because: "
<< "(" << e.what() << ")" << std::endl;
}
}
else
{
Sleep(10);
}
}
}
INT32 CServerSessionMgr::ProcessMsg(string& req, string& rsp)
{
rsp = req + "i am rsp";
return 0;
}