#include "thread_pool.h"
static void thread_pool_exit_handler(void *data); // 销毁线程池的线程执行的函数
static void *thread_pool_cycle(void *data); // 线程池里的每个线程执行的函数
static int_t thread_pool_init_default(thread_pool_t *tpp, char *name); // 线程池初始化默认值
static uint_t thread_pool_task_id; // 任务的独立id
static int debug = 0;
// 创建一个线程池,并初始化
thread_pool_t *thread_pool_init() {
int err;
pthread_t tid;
uint_t n;
pthread_attr_t attr; // 线程属性
thread_pool_t *tp = NULL;
tp = (thread_pool_t *)calloc(1, sizeof(thread_pool_t));
if (tp == NULL) {
fprintf(stderr, "thread_pool_init: calloc failed!\n");
return NULL;
}
thread_pool_init_default(tp, NULL);
thread_pool_queue_init(&tp->queue);
// 初始化互斥量
if (thread_mutex_create(&tp->mtx) != OK) {
free(tp);
return NULL;
}
// 初始化条件变量
if (thread_cond_create(&tp->cond) != OK) {
(void) thread_mutex_destroy(&tp->mtx);
free(tp);
return NULL;
}
// 初始化一个线程对象的属性
err = pthread_attr_init(&attr);
if (err) {
fprintf(stderr, "pthread_attr_init() failed, reason: %s\n", strerror(errno));
free(tp);
return NULL;
}
// PTHREAD_CREATE_DETACHED: 创建的子线程与主线程分离,即主线程不等待子线程结束,各自运行各自的
err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if (err) {
fprintf(stderr, "pthread_attr_setdetachstate() failed, reason: %s\n", strerror(errno));
free(tp);
return NULL;
}
// 根据设置好的线程个数,创建线程
for (n = 0; n < tp->threads; n++) {
err = pthread_create(&tid, &attr, thread_pool_cycle, tp);
if (err) {
fprintf(stderr, "pthread_create() failed, reason: %s\n", strerror(errno));
free(tp);
return NULL;
}
}
(void) pthread_attr_destroy(&attr);
return tp;
}
// 销毁线程池
void thread_pool_destroy(thread_pool_t *tp) {
uint_t n;
thread_task_t task;
volatile uint_t lock;
memset(&task, '\0', sizeof(thread_task_t));
task.handler = thread_pool_exit_handler;
task.ctx = (void *)&lock;
for (n = 0; n < tp->threads; n++) {
lock = 1;
if (thread_task_post(tp, &task) != 0) {
return;
}
// 进入处理函数后,lock会被置0
while (lock) {
sched_yield();
}
}
(void) thread_cond_destroy(&tp->cond);
(void) thread_mutex_destroy(&tp->mtx);
free(tp);
}
// 销毁线程执行的函数
static void thread_pool_exit_handler(void *data) {
uint_t *lock = (uint_t *)data;
*lock = 0;
pthread_exit(0);
}
// 创建一个任务,参数是执行线程要传入参数的内存大小
thread_task_t *thread_task_alloc(size_t size) {
thread_task_t *task;
// 分配的内存 加上 线程运行时传入的参数的位置大小
task = (thread_task_t *)calloc(1, sizeof(thread_task_t) + size);
if (task == NULL) {
return NULL;
}
task->ctx = task + 1; // task + 1: 就指向了参数的起始位置
return task;
}
// 将任务队列投递到线程池中
int_t thread_task_post(thread_pool_t *tp, thread_task_t *task) {
if (thread_mutex_lock(&tp->mtx) != OK) {
return ERROR;
}
// 任务队列等待个数不能大于设置的最大值
if (tp->waiting >= tp->max_queue) {
(void) thread_mutex_unlock(&tp->mtx);
fprintf(stderr, "thread pool \"%s\" queue overflow: %ld tasks waiting\n", tp->name, tp->waiting);
return ERROR;
}
task->id = thread_pool_task_id++;
task->next = NULL;
// 发射信号
if (thread_cond_signal(&tp->cond) != OK) {
(void) thread_mutex_unlock(&tp->mtx);
return ERROR;
}
*tp->queue.last = task; // 链接到尾部
tp->queue.last = &task->next; // 指向尾部
tp->waiting++;
// 解锁,只有这句代码指向后,线程等待哪里方可继续往下执行,信号通知只是其中一个条件
(void) thread_mutex_unlock(&tp->mtx);
if (debug) fprintf(stderr, "task #%lu added to thread pool \"%s\" \n", task->id, tp->name);
return OK;
}
// 线程执行函数
static void *thread_pool_cycle(void *data) {
thread_pool_t *tp = (thread_pool_t *)data;
int err;
thread_task_t *task;
if (debug) fprintf(stderr, "thread in pool \"%s\" started\n", tp->name);
for (;;) {
// 上锁
if (thread_mutex_lock(&tp->mtx) != OK) {
return NULL;
}
// 等待个数减一
tp->waiting--;
// 如果任务队列没有任务,则挂起等待
while (tp->queue.first == NULL) {
if (thread_cond_wait(&tp->cond, &tp->mtx) != OK) { // 等待,直到有信号通知
(void) thread_mutex_unlock(&tp->mtx);
return NULL;
}
}
// 获得任务队列的第一个任务
task = tp->queue.first;
tp->queue.first = task->next;
if (tp->queue.first == NULL) {
tp->queue.last = &tp->queue.first; // 像初始化那样
}
// 解锁
if (thread_mutex_unlock(&tp->mtx) != OK) {
return NULL;
}
if (debug) fprintf(stderr, "run task #%lu in thread pool \"%s\"\n", task->id, tp->name);
// 执行函数
task->handler(task->ctx);
if (debug) fprintf(stderr, "complete task #%lu in thread pool \"%s\"\n", task->id, tp->name);
task->next = NULL;
free(task);
task = NULL;
}
return NULL;
}
// 线程池初始化默认值
static int_t thread_pool_init_default(thread_pool_t *tpp, char *name) {
if (tpp) {
tpp->threads = DEFAULT_THREADS_NUM; // 线程个数
tpp->max_queue = DEFAULT_QUEUE_NUM; // 任务队列最大个数
tpp->name = strdup(name ? name : "default"); // 线程名字
if (debug) fprintf(stderr, "thread_pool_init, name: %s, threads: %lu max_queue: %ld\n", tpp->name, tpp->threads, tpp->max_queue);
return OK;
}
return ERROR;
}