#include <stdlib.h>
#include <string.h>
#include <stdarg.h>
#include <pthread.h>
#include <sys/time.h>
#include "queue.h"
/**
*******************************************************************************
*
*******************************************************************************
*/
int PacketQueuePutPrivate(PacketQueue *q, Packet *pkt)
{
PacketList *lpkt1;
if (q->abort_request)
return QUEUE_ABORT;
lpkt1 = malloc(sizeof(PacketList));
if (!lpkt1)
return QUEUE_ERROR;
lpkt1->pkt = *pkt;
lpkt1->next = NULL;
if (!q->last_pkt)
q->first_pkt = lpkt1;
else
q->last_pkt->next = lpkt1;
q->last_pkt = lpkt1;
q->nb_packets++;
q->size += lpkt1->pkt.size + sizeof(*lpkt1);
/* should duplicate packet data in DV case */
pthread_cond_signal(&q->cond);
return 0;
}
/**
*******************************************************************************
*
*******************************************************************************
*/
int PacketQueuePut(PacketQueue *q, Packet *pkt)
{
int ret;
pthread_mutex_lock(&q->mutex);
if (q->nb_packets > QUEUE_PACKET_MAX) {
pthread_mutex_unlock(&q->mutex);
return QUEUE_OVERFLOW;
}
ret = PacketQueuePutPrivate(q, pkt);
pthread_mutex_unlock(&q->mutex);
return ret;
}
/**
*******************************************************************************
*
*******************************************************************************
*/
int PacketQueuePutNullPacket(PacketQueue *q, int index)
{
Packet pkt1, *pkt = &pkt1;
pkt->data = NULL;
pkt->size = 0;
pkt->index = index;
return PacketQueuePut(q, pkt);
}
/**
*******************************************************************************
*
*******************************************************************************
*/
/* packet queue handling */
void PacketQueueInit(PacketQueue *q)
{
memset(q, 0, sizeof(PacketQueue));
pthread_mutex_init(&q->mutex, NULL);
pthread_cond_init(&q->cond, NULL);
q->abort_request = 1;
}
/**
*******************************************************************************
*
*******************************************************************************
*/
void PacketQueueFlush(PacketQueue *q)
{
PacketList *lpkt, *lpkt1;
pthread_mutex_lock(&q->mutex);
for (lpkt = q->first_pkt; lpkt != NULL; lpkt = lpkt1) {
lpkt1 = lpkt->next;
if (lpkt->pkt.data != NULL)
free(lpkt->pkt.data);
free(lpkt);
}
q->last_pkt = NULL;
q->first_pkt = NULL;
q->nb_packets = 0;
q->size = 0;
pthread_mutex_unlock(&q->mutex);
}
/**
*******************************************************************************
*
*******************************************************************************
*/
void PacketQueueDestroy(PacketQueue *q)
{
PacketQueueFlush(q);
pthread_mutex_destroy(&q->mutex);
pthread_cond_destroy(&q->cond);
}
/**
*******************************************************************************
*
*******************************************************************************
*/
void PacketQueueAbort(PacketQueue *q)
{
pthread_mutex_lock(&q->mutex);
q->abort_request = 1;
pthread_cond_signal(&q->cond);
pthread_mutex_unlock(&q->mutex);
}
/**
*******************************************************************************
*
*******************************************************************************
*/
void PacketQueueStart(PacketQueue *q)
{
pthread_mutex_lock(&q->mutex);
q->abort_request = 0;
//packet_queue_put_private(q, &flush_pkt);
pthread_mutex_unlock(&q->mutex);
}
/**
*******************************************************************************
*
*******************************************************************************
*/
/* return < 0 if aborted, 0 if no packet and > 0 if packet. */
int PacketQueueGet(PacketQueue *q, Packet *pkt, int block, ...)
{
PacketList *lpkt1;
int ret;
pthread_mutex_lock(&q->mutex);
for (;;) {
if (q->abort_request) {
ret = QUEUE_ABORT;
break;
}
lpkt1 = q->first_pkt;
if (lpkt1) {
q->first_pkt = lpkt1->next;
if (!q->first_pkt)
q->last_pkt = NULL;
q->nb_packets--;
q->size -= lpkt1->pkt.size + sizeof(*lpkt1);
*pkt = lpkt1->pkt;
free(lpkt1);
ret = 1;
break;
} else if (block == QUEUE_GET_NONE_BLOCK) {
ret = 0;
break;
} else if (block == QUEUE_GET_TIME_BLOCK) {
struct timespec overtime;
struct timeval nowtime;
va_list ap;
va_start(ap, block);
int us = (int)va_arg(ap, int);
va_end(ap);
gettimeofday(&nowtime, NULL);
overtime.tv_nsec = (nowtime.tv_usec + (us % (1000*1000))) * 1000;
overtime.tv_sec = nowtime.tv_sec + (us / (1000*1000));
if (overtime.tv_nsec >= 1*1000*1000*1000) {
overtime.tv_nsec = overtime.tv_nsec - 1*1000*1000*1000;
overtime.tv_sec = overtime.tv_sec + 1;
}
pthread_cond_timedwait(&q->cond, &q->mutex, &overtime);
ret = 0;
break;
} else {
pthread_cond_wait(&q->cond, &q->mutex);
}
}
pthread_mutex_unlock(&q->mutex);
return ret;
}
/**
*******************************************************************************
*
*******************************************************************************
*/
int PacketQueueGetPacketNumber(PacketQueue *q)
{
int ret = 0;
pthread_mutex_lock(&q->mutex);
ret = q->nb_packets;
pthread_mutex_unlock(&q->mutex);
return ret;
}