#include <sys/time.h>
#include <sys/syscall.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include "base.h"
#include "tpool.h"
#include "procedure.h"
threadpool_t g_threadpool = { 0 };
unsigned int g_thd_idx[THREAD_NR] = { 0 };
bool jobq_empty(int jobq_sn)
{
bool rc = 0;
pthread_mutex_lock(&g_threadpool.p_list_mutex[jobq_sn]);
rc = g_threadpool.p_list_head[jobq_sn].prev == &g_threadpool.p_list_head[jobq_sn];
pthread_mutex_unlock(&g_threadpool.p_list_mutex[jobq_sn]);
return rc;
}
void job_enque(job_t* p_job)
{
pthread_mutex_lock(&g_threadpool.p_list_mutex[p_job->jobq_sn]);
list_add(&g_threadpool.p_list_head[p_job->jobq_sn], &p_job->list_head);
atomic_inc(&g_threadpool.p_jobnr[p_job->jobq_sn]);
atomic_inc(&g_threadpool.jobnr_sum);
pthread_mutex_unlock(&g_threadpool.p_list_mutex[p_job->jobq_sn]);
}
void job_deque(int jobq_sn, lnode_t** pp_node)
{
pthread_mutex_lock(&g_threadpool.p_list_mutex[jobq_sn]);
list_get(&g_threadpool.p_list_head[jobq_sn], pp_node);
atomic_dec(&g_threadpool.p_jobnr[jobq_sn]);
atomic_dec(&g_threadpool.jobnr_sum);
pthread_mutex_unlock(&g_threadpool.p_list_mutex[jobq_sn]);
}
void* base_function(void* arg)
{
lnode_t* p_node = NULL;
job_t* p_job = NULL;
int rc = 0;
unsigned int thdidx = *(unsigned int*)arg;
unsigned int pid = getpid();
unsigned int tid = syscall(SYS_gettid);
unsigned int pthd_id = pthread_self(); //pthread中的编号
lnode_t* p_list_head = &g_threadpool.p_list_head[thdidx];
while (true)
{
while (!jobq_empty(thdidx))
{
assert(0 != g_threadpool.p_jobnr[thdidx]);
job_deque(thdidx, &p_node);
rc = 0;
while (rc != -1)
{
p_job = container_of(p_node, job_t, list_head);
stagefunction_t* demo_stagfunction = get_stagefuncs();
do
{
rc = demo_stagfunction[p_job->stage_next](p_job);
} while (rc == 0);
}
}
usleep(100000);
}
}
void tpool_create_init(int thdnr)
{
int i = 0;
for (i = 0; i < thdnr; i++)
{
g_thd_idx[i] = i;
g_threadpool.p_jobnr[i] = 0;
list_init(&g_threadpool.p_list_head[i]);
pthread_mutex_init(&g_threadpool.p_list_mutex[i], NULL);
printf("create thread %d.\n", i);
pthread_create(&g_threadpool.p_pthread[i], NULL, &base_function, &g_thd_idx[i]); //这里run起来了
}
return;
}
void set_job(job_t* p_job, int stage_sn, int jobq_sn)
{
p_job->stage_prev = p_job->stage_next;
p_job->stage_next = stage_sn;
p_job->jobq_sn = jobq_sn;
}
- 1
- 2
前往页