#include "circle_buff.h"
void DieWithError(char *errorMessage)
{
perror(errorMessage);
}
void *speed_statistical( void *arg )
{
circle_buff *p_cb_instance = (circle_buff *)arg;
int LAST_WRITE_CBU_SUM = 0;
int LAST_PROCE_CBU_SUM = 0;
int MODE_FLAG = p_cb_instance->MODE_FLAG;
while(1)
{
if( p_cb_instance->ithread_fin_flag == CIRCLE_BUFF_TRUE )
{
printf("[ speed_statistical ] : thread exit\n");
ACE_Thread::exit();
}
ACE_OS::sleep(1);
double v1 = p_cb_instance->write_cbu_count - LAST_WRITE_CBU_SUM;
double v2 = p_cb_instance->read_cbu_count - LAST_PROCE_CBU_SUM;
LAST_WRITE_CBU_SUM = p_cb_instance->write_cbu_count;
LAST_PROCE_CBU_SUM = p_cb_instance->read_cbu_count;
if( MODE_FLAG == TCP_SEND_MODE )
{
printf("[ send_circle buffer ] : write speed = %4.4f MBps,process speed = %4.4f MBps\n",v1*p_cb_instance->BUFF_BLOCK_SIZE/1024/1024,v2*p_cb_instance->BUFF_BLOCK_SIZE/1024/1024);
}
else
{
printf("[ recv_circle buffer ] : write speed = %4.4f MBps,process speed = %4.4f MBps\n",v1*p_cb_instance->BUFF_BLOCK_SIZE/1024/1024,v2*p_cb_instance->BUFF_BLOCK_SIZE/1024/1024);
}
}
}
// 正常情况下、返回当前受到的有效的字节的个数、否则返回recv的出错值
int recv_data_from_tcp( struct circle_buff_unit *p_curr_cbu, int TCP_BUFF_SIZE,int BUFF_BLOCK_SIZE, ACE_SOCK_Stream &logging_peer_ )
{
p_curr_cbu->valid_bytes = 0;
char * p_curr_cub_mem = p_curr_cbu->p_cbu_mem;
char * p_curr_tcp_window;
if( BUFF_BLOCK_SIZE%TCP_BUFF_SIZE != 0 )
{
printf("[ recv_data_from_tcp ] : BUFF_BLOCK_SIZE is not the multiple of buff_size\n BUFF_BLOCK_SIZE = %d,TCP_BUFF_SIZE = %d",BUFF_BLOCK_SIZE,TCP_BUFF_SIZE);
return -1;
}
int N = BUFF_BLOCK_SIZE/TCP_BUFF_SIZE;
for( int i = 0; i < N; i++ )
{
p_curr_tcp_window = p_curr_cub_mem + i*TCP_BUFF_SIZE;
int size = logging_peer_.recv_n(p_curr_tcp_window,TCP_BUFF_SIZE);
if( size == TCP_BUFF_SIZE)
{
p_curr_cbu->valid_bytes = p_curr_cbu->valid_bytes + size;
}
else if( size < 0 )
{
return size;
}
else
{
p_curr_cbu->valid_bytes = p_curr_cbu->valid_bytes + size;
return 0;
}
}
return p_curr_cbu->valid_bytes;
}
void *tcp_recv_thread(void *arg)
{
circle_buff *p_cb_instance = (circle_buff *)arg;
ACE_INET_Addr SERVER_ADDR;
ACE_SOCK_Acceptor acceptor_;
ACE_SOCK_Stream logging_peer_;
if(SERVER_ADDR.set (p_cb_instance->SERVER_PORT,(ACE_UINT32) INADDR_ANY) == -1)
{
printf("SERVER_ADDR.set error\n");
exit(-1);
}
if(acceptor_.open(SERVER_ADDR, 1) == -1)
{
printf("acceptor_.open error\n");
exit(-1);
}
if(acceptor_.accept(logging_peer_) == -1)
{
printf("acceptor_.accept\n");
exit(-1);
}
int loop_count = 0;
while (1)
{
int j = loop_count%p_cb_instance->BUFF_BLOCK_NUM;
if( p_cb_instance->cbu_vec[j].buff_state_flag == BUFF_READ_FIN )
{
p_cb_instance->cbu_vec[j].buff_state_flag = BUFF_WRITE_ING;
int ret_val = recv_data_from_tcp(&p_cb_instance->cbu_vec[j],p_cb_instance->TCP_BUFF_SIZE, p_cb_instance->BUFF_BLOCK_SIZE, logging_peer_);
if( ret_val == 0 ) // 发送端的数据发送完成,正常退出
{
printf("[ tcp_recv_thread ] : the client close the socket\n");
p_cb_instance->ithread_fin_flag = CIRCLE_BUFF_TRUE;
// change the state and signal the worker thread
p_cb_instance->cbu_vec[j].mutex_cbu.acquire();
p_cb_instance->cbu_vec[j].buff_state_flag = BUFF_WRITE_FIN;
p_cb_instance->cbu_vec[j].ithread_state = BUFF_WRITE_FIN_END;
p_cb_instance->cbu_vec[j].cond_cbu->signal();
p_cb_instance->cbu_vec[j].mutex_cbu.release();
break;
}
else if ( ret_val < 0 ) // 网络出现错误
{
printf(" [ tcp_recv_thread ] : An error happen in the tcp connection\n");
p_cb_instance->ithread_fin_flag = CIRCLE_BUFF_TRUE;
// change the state and signal the worker thread
p_cb_instance->cbu_vec[j].mutex_cbu.acquire();
p_cb_instance->cbu_vec[j].buff_state_flag = BUFF_WRITE_FIN;
p_cb_instance->cbu_vec[j].ithread_state = BUFF_WRITE_FIN_ERROR;
p_cb_instance->cbu_vec[j].cond_cbu->signal();
p_cb_instance->cbu_vec[j].mutex_cbu.release();
break;
}
else // 此数据发送完成,但还有新的数据要发
{
p_cb_instance->write_cbu_count++;
// change the state and signal the worker thread
p_cb_instance->cbu_vec[j].mutex_cbu.acquire();
p_cb_instance->cbu_vec[j].buff_state_flag = BUFF_WRITE_FIN;
p_cb_instance->cbu_vec[j].ithread_state = BUFF_WRITE_FIN_CONTI;
p_cb_instance->cbu_vec[j].cond_cbu->signal();
p_cb_instance->cbu_vec[j].mutex_cbu.release();
}
}
else
{
printf(" [ tcp_recv_thread ] : the %5dth block failed\n",loop_count);
p_cb_instance->ithread_fin_flag = CIRCLE_BUFF_TRUE;
ACE_Thread::exit();
}
loop_count = loop_count + 1; // next loop
}
logging_peer_.close();
acceptor_.close();
printf("[ tcp_recv_thread ] : thread exit\n");
ACE_Thread::exit();
return NULL;
}
void *tcp_send_thread(void *arg)
{
circle_buff *p_cb_instance = (circle_buff *)arg;
ACE_SOCK_Connector connector;
ACE_SOCK_Stream peer;
ACE_INET_Addr peer_addr;
if (peer_addr.set (p_cb_instance->SERVER_PORT, p_cb_instance->SERVER_ADDR) == -1)
{
printf("peer_add set error\n");
exit(-1);
}
next:
ACE_OS::sleep(1);
if (connector.connect (peer, peer_addr) == -1)
{
DieWithError("connect() is not established, try next");
goto next;
}
int loop_index = 0;
while(1)
{
int j = loop_index%p_cb_instance->BUFF_BLOCK_NUM;
p_cb_instance->cbu_vec[j].mutex_cbu.acquire();
if( p_cb_instance->cbu_vec[j].buff_state_flag == BUFF_WRITE_FIN )
{
p_cb_instance->cbu_vec[j].buff_state_flag = BUFF_READ_ING;
unsigned int send_bytes = p_cb_instance->cbu_vec[j].valid_bytes;
if ( peer.send_n(p_cb_instance->cbu_vec[j].p_cbu_mem, send_bytes) != send_bytes )
{
DieWithError("send() sent a different number of bytes than expected");
break;
}
p_cb_instance->cbu_vec[j].buff_state_flag = BUFF_READ_FIN;
if( p_cb_instance->cbu_vec[j].ithread_state == BUFF_WRITE_FIN_END )
{
p_cb_instance->cbu_vec[j].mutex_cbu.release();
break;
}
}
else
{
p_cb_instance->cbu_vec[j].cond_cbu->wait();
if( p_cb_instance->cbu_vec[j].buff_state_flag == BUFF_WRITE_FIN )
{
p_cb_instance->cbu_vec[j].buff_state_flag = BUFF_READ_ING;
unsigned int send_bytes = p_cb_instance->cbu_vec[j].valid_bytes;
if ( peer.send_n(p_cb_instance->cbu_vec[j].p_cbu_mem, send_bytes) != send_bytes )
{
DieWithError("send() sent a different number of bytes than expected");
break;
}
p_cb_instance->cbu_vec[j].buff_state_flag = BUFF_READ_FIN;
if( p_cb_instance->cbu_vec[j].ithread_state == BUFF_WRITE_FIN_END )
{
p_cb_instance->cbu_vec[j].mutex_cbu.release();
break;
}
}
}
p_cb_instance->read_cbu_count++;
p_cb_instance->cbu_vec[j].mutex_cbu.release();
loop_index++;
}
peer.close();
ACE_Thread::exit();
return NULL;
}
struct circle_buff_unit * circle_buff:: next_ready_cbu()
{
struct circle_buff_unit * p_cbu = NULL;
if( MODE_FLAG == TCP_RECV_MODE)
{
p_cbu = recv_next_ready_cbu();
}
else if( MODE_FLAG == TCP_SEND_MODE)
{
p_cbu = send_next_ready_cbu();
}
else
{
printf("[ next_ready_cbu ] : The MODE_FLAG is not the right value\n");
p_cbu = NULL;
}
return p_cbu;
}
// 返回当前可以使用的环形buff单元的指针
struct circle_buff_unit * circle_buff:: recv_next_ready_cbu()
{
int j = curr_ref_cbu_index;
// the mutex must be locked to check the state value
cbu_vec[j].mutex_cbu.acquire();
if( cbu_vec[j].buff_state_flag == BUFF_WRITE_FIN )
{
cbu_vec[j].buff_state_flag = BUFF_READ_ING;
}
else
{
cbu_vec[j].cond_cbu->wait();
if( cbu_vec[j].buff_state_fl