#include "Work_Socket.h"
#include "Services_Mgr.h"
#include "Cmd_Handler_Creator.h"
#include <log4cplus/logger.h>
#include <string>
namespace nettao {
Work_Socket::Work_Socket(boost::uint8_t *recv_key, boost::uint8_t *send_key, int key_len)
: recv_decrypt_(recv_key, key_len),
send_encrypt_(send_key, key_len)
{
boost::shared_ptr<Work_Task> task = SERVICES_MGR->get_work_task();
work_task_ = task;
socket_.reset(new boost::asio::ip::tcp::socket(*(task->get_io_service())));
socket_set_ = 0;
closing_ = false;
recv_read_ix_ = 0;
head_len_to_recv_ = PKG_HEAD_LEN;
max_pkg_size_ = MAX_PKG_SIZE;
in_sending_ = false;
}
Work_Socket::~Work_Socket()
{
LOG4CPLUS_DEBUG(log4cplus::Logger::getRoot(), LOG4CPLUS_TEXT("Work_Socket::~Work_Socket"));
close();
}
void
Work_Socket::open(void)
{
boost::shared_ptr<Work_Task> task;
task = work_task_.lock();
if (! task)
{
boost::asio::ip::tcp::endpoint endpoint = remote_endpoint();
boost::asio::ip::address addr = endpoint.address();
LOG4CPLUS_ERROR(log4cplus::Logger::getRoot(), LOG4CPLUS_TEXT("Work_Socket::open task is null. Peer ")
<< addr.to_string().c_str() << LOG4CPLUS_TEXT(":") << endpoint.port());
return;
}
// 初始化command package handler
Cmd_Handler_List handle_list;
cmd_handler_creator_mgr_->get_all_cmd_handler(shared_from_this(), handle_list);
cmd_handler_ctrl_.add_command_handler(handle_list);
BOOST_ASSERT(KEEP_ALIVE_INVALID != keep_alive_);
task->on_socket_open(shared_from_this());
{
boost::asio::ip::tcp::endpoint endpoint = remote_endpoint();
boost::asio::ip::address addr = endpoint.address();
LOG4CPLUS_DEBUG(log4cplus::Logger::getRoot(),
LOG4CPLUS_TEXT("Work_Socket::open--Peer ")
<< addr.to_string().c_str() << LOG4CPLUS_TEXT(":") << endpoint.port());
}
socket_->async_read_some(boost::asio::buffer(recv_buf_),
boost::bind(&Work_Socket::handle_read, shared_from_this(),
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
Cmd_Pkg_Tuple_Ptr
Work_Socket::create_send_pkg(int payload_len, int cmd)
{
int cmd_pkg_len;
cmd_pkg_len = payload_len + PKG_HEAD_LEN;
BOOST_ASSERT(cmd_pkg_len >= PKG_HEAD_LEN
&& cmd_pkg_len <= max_pkg_size_);
Cmd_Pkg_Tuple_Ptr cmd_pkg(new Cmd_Pkg_Tuple(boost::shared_ptr<boost::uint8_t>
(reinterpret_cast<boost::uint8_t *>(malloc(cmd_pkg_len)), free), 0));
boost::tuples::get<1>(*cmd_pkg) = PKG_HEAD_LEN;
boost::tuples::get<2>(*cmd_pkg) = cmd;
boost::uint8_t *head_base = boost::tuples::get<0>(*cmd_pkg).get();
Pkg_Head *cmd_pkg_head = reinterpret_cast<Pkg_Head *>(head_base);
cmd_pkg_head->cmd = cmd;
cmd_pkg_head->size = cmd_pkg_len;
cmd_pkg_head->hton();
send_encrypt_.update_data(PKG_HEAD_LEN, head_base); // 对数据包头进行加密
return cmd_pkg;
}
void
Work_Socket::handle_read(const boost::system::error_code &e, std::size_t bytes_transferred)
{
if (e)
{
LOG4CPLUS_ERROR(log4cplus::Logger::getRoot(), LOG4CPLUS_TEXT("Work_Socket::handle_read--") << e.message().c_str());
occur_error_in_handle();
return;
}
BOOST_ASSERT(! closing_);
if (KEEP_ALIVE_RECV == keep_alive_)
{
boost::shared_ptr<Work_Task> task;
task = work_task_.lock();
if (! task)
{
LOG4CPLUS_ERROR(log4cplus::Logger::getRoot(), LOG4CPLUS_TEXT("Work_Socket::send_pkg_internal--work_task_ is null"));
return;
}
last_pkg_time_ = boost::posix_time::second_clock::local_time().time_of_day();
task->update_last_pkg_time(shared_from_this());
}
recv_len_ = bytes_transferred;
do
{
Cmd_Pkg_Tuple_Ptr cmd_pkg(new Cmd_Pkg_Tuple());
bool one_pkg_finish = false;
if (-1 == get_one_cmd_pkg(one_pkg_finish, cmd_pkg))
{
occur_error_in_handle();
return;
}
if (! one_pkg_finish)
{
// 本次接收到的数据处理完成,进行下一次读
BOOST_ASSERT(0 == recv_len_);
recv_read_ix_ = 0;
socket_->async_read_some(boost::asio::buffer(recv_buf_),
boost::bind(&Work_Socket::handle_read, shared_from_this(),
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
return;
}
// 已接收完一个数据包,交给handler处理
if (-1 == cmd_handler_ctrl_.handle_cmd_pkg(cmd_pkg))
{
occur_error_in_handle();
return;
}
head_len_to_recv_ = PKG_HEAD_LEN; // 重新开始接收新的包头和数据包
} while (1);
}
void
Work_Socket::handle_write(const boost::system::error_code &e)
{
if (e)
{
LOG4CPLUS_ERROR(log4cplus::Logger::getRoot(), LOG4CPLUS_TEXT("Work_Socket::handle_write--") << e.message().c_str());
occur_error_in_handle();
return;
}
if (closing_)
{
LOG4CPLUS_WARN(log4cplus::Logger::getRoot(), LOG4CPLUS_TEXT("Work_Socket::handle_write--is closed"));
return;
}
if (KEEP_ALIVE_SEND == keep_alive_)
{
boost::shared_ptr<Work_Task> task;
task = work_task_.lock();
if (! task)
{
LOG4CPLUS_ERROR(log4cplus::Logger::getRoot(), LOG4CPLUS_TEXT("Work_Socket::send_pkg_internal--work_task_ is null"));
return;
}
last_pkg_time_ = boost::posix_time::second_clock::local_time().time_of_day();
task->update_last_pkg_time(shared_from_this());
}
{
boost::mutex::scoped_lock monitor(send_queue_lock_);
send_queue_.pop_front();
if (send_queue_.empty())
{
in_sending_ = false;
return;
}
send_pkg_internal();
}
}
void
Work_Socket::occur_error_in_handle(void)
{
if (closing_)
return;
BOOST_ASSERT(0 != socket_set_);
BOOST_ASSERT(socket_set_->end() != socket_set_->find(shared_from_this()));
socket_set_->erase(shared_from_this());
socket_set_ = 0;
close();
}
int
Work_Socket::send_pkg(Cmd_Pkg_Tuple_Ptr cmd_pkg)
{
boost::mutex::scoped_lock monitor(send_queue_lock_);
send_queue_.push_back(cmd_pkg);
if (! in_sending_)
return send_pkg_internal();
return 0;
}
void
Work_Socket::send_keep_alive_pkg(void)
{
// 服务器端不发送心跳包
if (KEEP_ALIVE_RECV == keep_alive_)
return;
LOG4CPLUS_DEBUG(log4cplus::Logger::getRoot(), LOG4CPLUS_TEXT("Work_Socket::send_keep_alive_pkg"));
Cmd_Pkg_Tuple_Ptr keep_alive_pkg = create_send_pkg(0, CMD_KEEP_ALIVE);
send_pkg(keep_alive_pkg);
}
int
Work_Socket::send_pkg_internal(void)
{
// 调用这个函数之前已对send queue加锁
if (closing_)
return -1;
in_sending_ = true;
Cmd_Pkg_Tuple_Ptr cmd_pkg = send_queue_.front();
boost::asio::async_write(*socket_,
boost::asio::buffer(boost::tuples::get<0>(*cmd_pkg).get(), boost::tuples::get<1>(*cmd_pkg)),
boost::bind(&Work_Socket::handle_write, shared_from_this(),
boost::asio::placeholders::error));
return 0;
}
void
Work_Socket::close(void)
{
if (closing_)
{
return;
}
closing_ = true;
boost::system::error_code ignored_ec;
socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec);
}
int
Work_Socket::get_one_cmd_pkg(bool &one_pkg_finish, Cmd_Pkg_Tuple_Ptr &cmd_pkg)
{
BOOST_ASSERT(head_len_to_recv_ >= 0
&& head_len_to_recv_ <= PKG_HEAD_LEN);
// 处理接收数据包头
if (head_len_to_recv_ > 0)
{
boost::uint8_t *pkg_head_recv_base = reinterpret_cast<boost::uint8_t *>(&pkg_head_recv_);
std::size_t to_header_size = recv_len_ > head_len_to_recv_ ? head_len_to_recv_ : recv_len_;
// (PKG_HEAD_LEN - head_len_to_recv_)为已接收的数据包头长度
memcpy(pkg_head_recv_base + (PKG_HEAD_LEN - head_len_to_recv_), recv_buf_.c_array() + recv_read_ix_, to_header_size);
recv_read_ix_ += to_header_size;
recv_len_ -= to_header_size;
head_len_to_recv_ -= to_header_size;
if (head_len_to_recv_ > 0) // 数据包头还没接收完返回
{
BOOST_ASSE