#include "sat_user_queue.h"
//#include "common/sat_user_time.h"
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <sys/time.h>
typedef struct sat_queue_List
{
sat_queue_node pkt;
int read_count;
struct sat_queue_List *next;
} sat_queue_List;
typedef struct sat_user_info
{
sat_queue_List *first_pkt;
} sat_user_info;
typedef struct sat_user_list
{
sat_user_info p_uinfo;
struct sat_user_list *next;
sat_queue_handle *handle;
} sat_user_list;
typedef struct sat_queue
{
sat_queue_List *first_pkt;
sat_queue_List *last_pkt;
int nb_node;
int mem_size;
int nb_node_max;
int abort_request;
int64_t last_pts;
sat_user_list *user;
int user_count;
pthread_mutex_t mutex;
pthread_cond_t cond;
} sat_queue;
unsigned long long user_timestamp_get(void)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec * 1000 + tv.tv_usec / 1000;
}
/***********************************************
** 作者: leo.liu
** 日期: 2022-11-22 17:15:44
** 说明: 调整队列
***********************************************/
static bool _sat_queue_adj(sat_queue_handle qq, int timeout_ms)
{
sat_queue *q = (sat_queue *)qq;
if (q->nb_node < q->nb_node_max)
{
for (sat_user_list *p = q->user; p != NULL; p = p->next)
{
if ((p->p_uinfo.first_pkt == NULL) && (q->first_pkt != NULL))
{
p->p_uinfo.first_pkt = q->first_pkt;
}
}
return true;
}
sat_queue_List *pkt = q->first_pkt;
if (pkt == NULL)
{
return true;
}
if (pkt->read_count < q->user_count)
{
pthread_mutex_unlock(&q->mutex);
while (pkt->read_count < q->user_count)
{
usleep(1000);
timeout_ms--;
if (timeout_ms == 0)
{
printf("queue adj timeout:(read count:%d user total:%d) \n", pkt->read_count, q->user_count);
break;
}
}
pthread_mutex_lock(&q->mutex);
if (timeout_ms == 0)
{
return false;
}
}
for (sat_user_list *p = q->user; p != NULL; p = p->next)
{
if ((p->p_uinfo.first_pkt == NULL) || (p->p_uinfo.first_pkt == pkt))
{
p->p_uinfo.first_pkt = pkt->next;
}
}
q->first_pkt = pkt->next;
if (!q->first_pkt)
{
q->last_pkt = NULL;
}
q->nb_node--;
q->mem_size -= pkt->pkt.size + sizeof(*pkt);
free(pkt->pkt.data);
free(pkt);
return true;
}
static bool _sat_queue_put(sat_queue_handle qq, sat_queue_node *pkt)
{
sat_queue *q = (sat_queue *)qq;
sat_queue_List *pkt1 = (sat_queue_List *)malloc(sizeof(sat_queue_List));
if (!pkt1)
{
return false;
}
pkt1->pkt = *pkt;
pkt1->next = NULL;
pkt1->read_count = 0;
if (!q->last_pkt)
q->first_pkt = pkt1;
else
q->last_pkt->next = pkt1;
q->last_pkt = pkt1;
q->nb_node++;
q->mem_size += pkt1->pkt.size + sizeof(*pkt1);
q->last_pts = pkt->pts;
pthread_cond_signal(&q->cond);
return true;
}
bool sat_queue_put(sat_queue_handle qq, sat_queue_node *pkt)
{
sat_queue *q = (sat_queue *)qq;
pthread_mutex_lock(&q->mutex);
if (_sat_queue_adj(qq, 500) == true)
{
_sat_queue_put(qq, pkt);
}
pthread_mutex_unlock(&q->mutex);
return true;
}
bool sat_queue_get(sat_queue_user user, sat_queue_node **pkt, int block)
{
if (user == NULL)
{
printf("user is null\n");
return false;
}
sat_user_list *p_user = (sat_user_list *)user;
sat_queue *q = (sat_queue *)p_user->handle;
if (q == NULL)
{
printf("user queue is null\n");
return false;
}
bool ret = true;
pthread_mutex_lock(&q->mutex);
sat_user_list *p_u = NULL;
for (p_u = q->user; p_u != NULL; p_u = p_u->next)
{
if (p_u == p_user)
{
break;
}
}
if (p_u == NULL)
{
pthread_mutex_unlock(&q->mutex);
printf("not find user queue\n");
return false;
}
for (;;)
{
if (q->abort_request)
{
ret = false;
break;
}
sat_queue_List *pkt1 = p_u->p_uinfo.first_pkt;
if (pkt1)
{
p_u->p_uinfo.first_pkt = pkt1->next;
*pkt = &(pkt1->pkt);
// pkt1->read_count++;
break;
}
else if (!block)
{
ret = false;
printf("user queue empty\n");
break;
}
else
{
printf("queue empty, condition wait %lld\n", user_timestamp_get());
pthread_cond_wait(&q->cond, &q->mutex);
printf("leave condition wait %lld\n", user_timestamp_get());
}
}
pthread_mutex_unlock(&q->mutex);
return ret;
}
/***********************************************
** 作者: leo.liu
** 日期: 2022-11-22 15:9:1
** 说明: 标记释放节点
***********************************************/
bool sat_queue_free(sat_queue_user user, sat_queue_node **pkt)
{
if (*pkt == NULL)
{
return false;
}
sat_user_list *p_user = (sat_user_list *)user;
sat_queue *q = (sat_queue *)p_user->handle;
sat_queue_List *p = (sat_queue_List *)(*pkt);
pthread_mutex_lock(&q->mutex);
p->read_count++;
pthread_mutex_unlock(&q->mutex);
return true;
}
/***********************************************
** 作者: leo.liu
** 日期: 2022-11-22 15:9:1
** 说明: 创建一个队列
***********************************************/
sat_queue_handle sat_queue_create(int queue_max)
{
sat_queue *q = malloc(sizeof(sat_queue));
memset(q, 0, sizeof(sat_queue));
pthread_mutex_init(&q->mutex, NULL);
pthread_cond_init(&q->cond, NULL);
q->nb_node_max = queue_max;
return (sat_queue_handle)q;
}
/***********************************************
** 作者: leo.liu
** 日期: 2022-11-22 15:12:41
** 说明: 增加一个可读用户
***********************************************/
sat_queue_user sat_queue_user_create(sat_queue_handle qq)
{
sat_queue *q = (sat_queue *)qq;
sat_user_list *u = (sat_user_list *)malloc(sizeof(sat_user_list));
u->p_uinfo.first_pkt = NULL;
u->next = NULL;
u->handle = qq;
pthread_mutex_lock(&q->mutex);
if (q->user == NULL)
{
q->user = u;
}
else
{
sat_user_list *p = q->user;
while (p->next)
{
p = p->next;
}
p->next = u;
}
q->user_count++;
pthread_mutex_unlock(&q->mutex);
return (sat_queue_user)u;
}
/***********************************************
** 作者: leo.liu
** 日期: 2022-11-22 15:12:41
** 说明: 删除一个可读用户
***********************************************/
bool sat_queue_user_del(sat_queue_user user)
{
sat_user_list *u = (sat_user_list *)user;
if (u == NULL)
{
return false;
}
sat_queue *q = (sat_queue *)u->handle;
pthread_mutex_lock(&q->mutex);
if (q->user == u)
{
sat_user_list *pu = q->user->next;
free(q->user);
q->user = pu;
q->user_count--;
}
else
{
sat_user_list *pp = q->user;
for (sat_user_list *p = q->user->next; p != NULL; p = p->next)
{
if (p == u)
{
pp->next = p->next;
free(p);
}
pp = p;
q->user_count--;
break;
}
}
pthread_mutex_unlock(&q->mutex);
}