//fifo.c
#include "fifo.h"
//**************************************************************
//datadispatch module
// 1. dispatch one data source to multiple outputs
// 2. input packet size and output packet size can be variable
// 3. if [data cache] is full, input function don't put data into [data cache] and log the error.
//
// |------>[data cache]======>
// [data source]----->|
// |------>[data cache]======>
// |
// |------>[data cache]======>
// |
//
// --------> input ======>get from cache
//
//
//Usage:
// 1.call dispInit().
// 2.Call regOutputModule(): once to register one output. This function is not thread safe.
// 3.The input thread call datadispInput() to input data; and each output thread call getdatadisp() to get data from cache.
// 4.After input and output threads stopped, then call disregAllOutputModules() to unregister all output modules.
DISP_HANDLE dispInit()
{
DISP_OUTPUT* handle = (DISP_OUTPUT*)malloc(sizeof(DISP_OUTPUT));
handle->dispindex = 0;
return handle;
}
//register an output module
//return output module ID, error: <0
int regOutputModule(DISP_HANDLE h,DWORD fifoSize)
{
DISP_OUTPUT* handle = (DISP_OUTPUT*)h;
DISP_OUPUT_CORE * pCore = &(handle->dispcore[handle->dispindex]);
assert(handle->dispindex<MAX_OUTPUT_MODULE);
if (handle->dispindex>=MAX_OUTPUT_MODULE)
return -1;//full
pCore->buf = (char *)malloc(fifoSize);
assert(pCore->buf);
if (!(pCore->buf))
return -2;
pCore->bufsize = fifoSize;
pCore->pin = pCore->buf;
pCore->pout = pCore->buf;
pCore->discardBytes = 0;
pCore->inputBytes= 0;
pCore->outputBytes= 0;
if(pthread_mutex_init(&(pCore->mut),NULL)!=0)
{
mylog("init mutex in regoutModule error! \n");
return -1;
}//add by Eric
handle->dispindex++;
return (handle->dispindex-1);
}
//free all modules that registered by regOutputModule()
int disregAllOutputModules(DISP_HANDLE h)
{
DISP_OUTPUT* handle = (DISP_OUTPUT*)h;
DISP_OUPUT_CORE * pCore;
int i;
int size;
for (i=0; i<handle->dispindex; i++)
{
pCore = &(handle->dispcore[i]);
//get valide data size
size=pCore->pin - pCore->pout;
if (size<0)
size+=pCore->bufsize;
mylog("output module #%d, input bytes[%u],output bytes[%u], discard bytes[%u], left bytes[%i]\n",i,pCore->inputBytes,pCore->outputBytes,pCore->discardBytes,size);
assert(pCore->buf);
free(pCore->buf);
pCore->buf = 0;
pthread_mutex_destroy(&pCore->mut);//add by Eric
}
handle->dispindex = 0;
free(handle);
return 0;
}
//dispatch data to each output module
//if return -3 ,indicating full not input anydata
int datadispInput(DISP_HANDLE h, const void *data, int datalen)
{
DISP_OUTPUT* handle = (DISP_OUTPUT*)h;
DISP_OUPUT_CORE * pCore;
int size;
int i;
int rightsize;
int bFullNotInput=false;;
for (i=0; i< handle->dispindex; i++)
{
//mylog("dispindex size=%d \n",handle->dispindex);
pCore = &(handle->dispcore[i]);
//get valide data size
if(pthread_mutex_lock(&pCore->mut) !=0)
{
mylog(" cj input data error !lock mutex error!\n");
return -1;
}//add by Eric
size=pCore->pin - pCore->pout;
if (size<0)
size+=pCore->bufsize;
//printf("cur size 0x%x\n",size);
if (datalen+size > pCore->bufsize-1)
{//1: to differ empty and full
pCore->discardBytes+=datalen;
//#ifdef FULL_LOG
bFullNotInput=true;
// mylog("output module #%d is full, data in buffer [%d], can't push in [%d],pCore->pin=%d,pCore->pout=%d,pCore->bufsize=%d\n",i,size,datalen,pCore->pin,pCore->pout,pCore->bufsize);
//#endif
pthread_mutex_unlock(&pCore->mut);
continue;
}
bFullNotInput=false;
//copy data
rightsize=pCore->buf + pCore->bufsize - pCore->pin;
// mylog("Eric:: before data input \n buf start: %d, pCore->pin: %d, pout: %d\nrightsize is %d, datalen is %d\n",
// pCore->buf,pCore->pin,pCore->pout,rightsize,datalen);
if (rightsize>=datalen)
{
memcpy(pCore->pin,data,datalen);
}
else
{
memcpy(pCore->pin,data,rightsize);
memcpy(pCore->buf,(char *)data+rightsize,datalen-rightsize);
}
//set inputBytes
pCore->inputBytes+= datalen;
//move pIn;
if (pCore->pin+datalen > pCore->buf + pCore->bufsize-1)
pCore->pin = pCore->pin+datalen - pCore->bufsize;
else
pCore->pin+=datalen;
pthread_mutex_unlock(&pCore->mut); //add by Eric
// mylog("Eric:: after datainput\n buf start: %d, pCore->pin: %d, pCore->pout : %d\n\n",
// pCore->pin,pCore->pout);
}
if(bFullNotInput)
return -3; //indicating full not input;
return 0;
}
int getdatadispsize(DISP_HANDLE h,int id)
{
DISP_OUTPUT* handle = (DISP_OUTPUT*)h;
DISP_OUPUT_CORE * pCore = &(handle->dispcore[id]);
int size=0;//valide data size
if(id>=handle->dispindex&&id<0)
mylog("Eric:: invalde index=%d",id);
//assert(id>=0 && id < handle->dispindex);
if (id<0 || id>=handle->dispindex)
return -1;
if(pthread_mutex_lock(&pCore->mut)!=0)
{
mylog("cj get datadisp size error lock mutex error!\n");
return -1;
}
else
{
size=pCore->pin - pCore->pout;
if (size<0)
size+=pCore->bufsize;
pthread_mutex_unlock(&pCore->mut); //add by Eric
}
return size;
}
//get data from fifo of a registered module, if size in the fifo is not enough, return -1
/*modified by caojie
//usage: return -1 means not enough data ,if bForce=true ,read all the left,else ,not read
//if read all the left,*pOutputlen indicates how many byte returned
*/
// end of data dispatch module ===============================================
//
int getdatadisp(DISP_HANDLE h, int id,void *buf, int* pOutputlen,bool bForce)
{
DISP_OUTPUT* handle = (DISP_OUTPUT*)h;
DISP_OUPUT_CORE * pCore = &(handle->dispcore[id]);
int size;//valide data size
int rightsize;
int outputlen=*pOutputlen;
int bNotEnough=false;
if(id>=handle->dispindex&&id<0)
mylog("Eric:: invalde index=%d",id);
assert((id>=0) && (id < handle->dispindex));
if (id<0 || id>=handle->dispindex){
mylog("getdata disp error!!index d%\n",id);
return -2;
}
if(pthread_mutex_lock(&pCore->mut) !=0)
{
mylog("cj lock mutex error! getdatadisp \n");
return -1;//modified by caojie
}
size=pCore->pin - pCore->pout;
if (size<0)
size+=pCore->bufsize;
if (size<outputlen)
{
//printf("no enough data in fifo[id:%d], read [len:%d], left [len:%d]\n",id,outputlen,size);
if(!bForce)
{
*pOutputlen=0;
pthread_mutex_unlock(&pCore->mut); //add by caojie
// mylog("not force read reading data from fifo not enough size=%d,poutputlen=%d,pCore->pin=%d,pCore->pout=%d \n",size,outputlen,pCore->pin,pCore->pout);
return -1;
}
else
{
*pOutputlen=size; //read all the left
outputlen=size;
bNotEnough=true;
}
}
//copy data
rightsize=pCore->buf + pCore->bufsize - pCore->pout;
if (rightsize>=outputlen)
{
memcpy(buf,pCore->pout,outputlen);
}
else
{
memcpy(buf,pCore->pout,rightsize);
memcpy((char *)buf+rightsize,pCore->buf,outputlen-rightsize);
}
//move pOut;
if (pCore->pout+outputlen > pCore->buf + pCore->bufsize-1)
pCore->pout = pCore->pout + outputlen - pCore->bufsize;
else
pCore->pout+=outputlen;
//printf("Output udp data length:%d\n",outputlen);
pCore->outputBytes+=outputlen;
pthread_mutex_unlock(&pCore->mut); //add by Eric
if(bNotEnough){
// mylog("cj::data is not Enough in fifo!!\n");
return -1;}
return 0;
}
//**************************************************************
//fifo module
//The module is used for many inputers and one outputer
//The output packet size is the same as the input packet size
//
//
// end of command fifo module=====================================
//
评论0