#include "thread_pool.h" #include #include #include #include #include #define DETECTNUM 3 #define CREATE_THREAD_NUM 2 #define EXIT_THREAD_NUM 2 // 定义任务结构体 typedef struct Task { void (*func)(void* arg); void* arg; }Task; // 定义线程池结构体 struct ThreadPool { // 任务队列 Task* taskQ; int queueCapacity; // 队列容量 int queueSize; // 当前队列中任务的个数 int queueFront; // 队列头部 -> 取数据 int queueEnd; // 队列尾部 -> 存数据 pthread_t managerID; // 管理者线程的ID pthread_t* pworkids; // 工作线程的ID,存放在一个数组中,由pworkids这个指针指向这个数组 int minNum; // 最小线程数量 int maxNum; // 最大线程数量 int busyNum; // 正在工作的线程的锁 int liveNum; // 存活的线程数量 int exitNum; // 根据实际情况计算需要销毁的线程数量 pthread_mutex_t mutexPool; // 整个线程池的锁 pthread_mutex_t mutexBusy; // 对busyNum变量进行上锁 pthread_cond_t cond_Full; // 任务队列是否满了的条件变量 pthread_cond_t cond_Empty; // 任务队列是否为空的条件变量 int shutdown; // 是否销毁线程池 0 - 不销毁 \ 1 - 销毁 }; ThreadPool* threadPoolCreate(int min, int max, int queueCapacity) { ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool)); do { if (pool == NULL) { printf("malloc threadpool failed.\n"); break; } pool->pworkids = (pthread_t*)malloc(sizeof(pthread_t) * max); if (pool->pworkids == NULL) { printf("malloc pworkids failed.\n"); break; } memset(pool->pworkids, 0, sizeof(pthread_t) * max); pool->minNum = min; pool->maxNum = max; pool->busyNum = 0; pool->liveNum = min; // 存活个数按线程池创建后的最小个数创建,和最小个数相等 pool->exitNum = 0; // 根据实际情况进行计算销毁,初始化为0 if ( pthread_mutex_init(&pool->mutexPool, NULL) != 0 || pthread_mutex_init(&pool->mutexBusy, NULL) != 0 || pthread_cond_init(&pool->cond_Full, NULL) != 0 || pthread_cond_init(&pool->cond_Empty, NULL) != 0 ) { printf("mutex or condition init failed.\n"); break; } // 初始化任务队列 pool->taskQ = (Task*)malloc(sizeof(Task) * queueCapacity); if (pool->taskQ == NULL) { printf("malloc taskQ failed.\n"); break; } pool->queueCapacity = queueCapacity; pool->queueSize = 0; pool->queueFront = 0; pool->queueEnd = 0; pool->shutdown = 0; // 创建管理者线程 // 因为线程管理者需要操作线程池中的数据,所以直接把线程池结构体指针传参给管理者线程即可 //pthread_create(&pool->managerID, NULL, manager, NULL); pthread_create(&pool->managerID, NULL, manager, pool); // 创建工作线程,线程池初始化的时候按照传递进来的最低参数进行创建 for (size_t i = 0; i < min; i++) { // 因为工作线程需要从线程队列中操作任务,而线程队列是在线程池结构体中定义的,所以直接把线程池结构体指针传参给工作线程即可 //pthread_create(&pool->pworkids[i], NULL, working, NULL); pthread_create(&pool->pworkids[i], NULL, working, pool); } // 返回创建好的线程池结构体指针 return pool; } while (0); if (pool && pool->pworkids) free(pool->pworkids); if (pool && pool->taskQ) free(pool->taskQ); if (pool) free(pool); return NULL; } ThreadPool* threadPoolCreateBak01(int min, int max, int queueCapacity) { ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool)); if (pool == NULL) { printf("malloc threadpool failed.\n"); return NULL; } pool->pworkids = (pthread_t*)malloc(sizeof(pthread_t) * max); if (pool->pworkids == NULL) { printf("malloc pworkids failed.\n"); if (pool) free(pool); return NULL; } // 将线程数组中的线程id都置为0,以便后续通过是否为0判断是否需要创建新的线程 memset(pool->pworkids, 0, sizeof(pthread_t) * max); pool->minNum = min; pool->maxNum = max; pool->busyNum = 0; pool->liveNum = min; // 存货个数按线程池创建后的最小个数创建,和最小个数相等 pool->exitNum = 0; // 根据实际情况进行计算销毁,初始化为0 if ( pthread_mutex_init(&pool->mutexPool, NULL) != 0 || pthread_mutex_init(&pool->mutexBusy, NULL) != 0 || pthread_cond_init(&pool->cond_Full, NULL) != 0 || pthread_cond_init(&pool->cond_Empty, NULL) != 0 ) { printf("mutex or condition init failed.\n"); if (pool && pool->pworkids) { free(pool->pworkids); free(pool); } return NULL; } // 初始化任务队列 pool->taskQ = (Task*)malloc(sizeof(Task) * queueCapacity); if (pool->taskQ == NULL) {\ if (pool && pool->pworkids) { free(pool->pworkids); free(pool); } return NULL; } pool->queueCapacity = queueCapacity; pool->queueSize = 0; pool->queueFront = 0; pool->queueEnd = 0; pool->shutdown = 0; // 创建管理者线程 pthread_create(&pool->managerID, NULL, manager, pool); // 创建工作线程,线程池初始化的时候按照传递进来的最低参数进行创建 for (size_t i = 0; i < min; i++) { pthread_create(&pool->pworkids[i], NULL, working, NULL); } return pool; } int threadPoolDestroy(ThreadPool* pool) { if (pool == NULL) { return -1; } // 关闭线程池 pool->shutdown = 1; // 阻塞回收管理者线程 pthread_join(pool->managerID, NULL); printf("manager thread destroyed.\n"); // 唤醒阻塞的消费者线程,当shutdown=1的时候,所有的线程就会自动退出 for (size_t i = 0; i < pool->liveNum; i++) { pthread_cond_signal(&pool->cond_Empty); } while (pool->liveNum) { printf("pool->liveNum: %d\n", pool->liveNum); } sleep(1); printf("working thread destroyed.\n"); // 释放堆内存 if (pool->taskQ) { free(pool->taskQ); pool->taskQ = NULL; } printf("free pool->taskQ successed.\n"); if (pool->pworkids) { free(pool->pworkids); pool->pworkids = NULL; } printf("free pool->pworkids successed.\n"); // 释放锁和条件变量 pthread_mutex_destroy(&pool->mutexBusy); pthread_mutex_destroy(&pool->mutexPool); pthread_cond_destroy(&pool->cond_Empty); pthread_cond_destroy(&pool->cond_Full); free(pool); pool = NULL; printf("free pool successed.\n"); return 0; } void threadPoolAddTask(ThreadPool* pool, void(*func)(void*), void* arg) { pthread_mutex_lock(&pool->mutexPool); if (pool->queueSize == pool->queueCapacity && !pool->shutdown) { // 阻塞生产者线程 pthread_cond_wait(&pool->cond_Full, &pool->mutexPool); } if (pool->shutdown) { pthread_mutex_unlock(&pool->mutexPool); return; } // 添加任务到任务队列 pool->taskQ[pool->queueEnd].func = func; pool->taskQ[pool->queueEnd].arg = arg; pool->queueEnd = (pool->queueEnd + 1) % pool->queueCapacity; pool->queueSize++; //唤醒消费者工作线程处理任务 pthread_cond_signal(&pool->cond_Empty); // 解锁 pthread_mutex_unlock(&pool->mutexPool); } int getWorkingThreads(ThreadPool* pool) { pthread_mutex_lock(&pool->mutexBusy); int busyNum = pool->busyNum; pthread_mutex_unlock(&pool->mutexBusy); return busyNum; } int getALiveThreads(ThreadPool* pool) { pthread_mutex_lock(&pool->mutexPool); int aliveNum = pool->liveNum; pthread_mutex_unlock(&pool->mutexPool); return aliveNum; } int getTaskNum(ThreadPool* pool) { pthread_mutex_lock(&pool->mutexPool); int taskNum = pool->queueSize; pthread_mutex_unlock(&pool->mutexPool); return taskNum; } // 工作线程的任务函数 void* working(void* arg) { // void* 在c语言中是个泛型,表示可以接收任意类型的参数,所以需要对arg进行类型转换 ThreadPool* pool = (ThreadPool*)arg; while (1) { // 因为线程池这块内存由多个线程可能同时操作,需要使用锁进行锁定,以避免并发问题 pthread_mutex_lock(&pool->mutexPool); // 判断当前任务队列是否为空并且线程池是否已经退出,为空就阻塞线程执行 while(pool->queueSize == 0 && !pool->shutdown){ // 阻塞工作线程 pthread_cond_wait(&pool->cond_Empty, &pool->mutexPool); // 判断是否要销毁线程 if (pool->exitNum > 0) { pool->exitNum--; if (pool->liveNum > pool->minNum) { pool->liveNum--; // 在需要退出的线程退出之前,需要解开互斥锁,否则会产生死锁风险 pthread_mutex_unlock(&pool->mutexPool); // 退出当前线程,线程退出后应该把当前线程在线程数组中的pid置为0 //pthread_exit(NULL); // 直接退出没有把线程数组中当前线程的线程id置为0,那后续无法新创建线程 // 所以应该调用一个线程退出函数去把退出线程的线程id置为0 threadExit(pool); } } } // 在向下执行前仍需要判断以下线程池是否已经关闭 if (pool->shutdown) { // 解锁,防止多线程执行的死锁风险 pthread_mutex_unlock(&pool->mutexPool); pool->liveNum--; threadExit(pool); } // 从任务队列中获取任务 Task task; task.func = pool->taskQ[pool->queueFront].func; task.arg = pool->taskQ[pool->queueFront].arg; // 维护一个环形队列,queueFront向后移动一位即可 // 维护环形队列,则队头移动遵循,(队头 + 1) % 队列容量,则队头始终是在容量范围内移动 pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity; pool->queueSize--; // 唤醒生产者线程执行任务添加 pthread_cond_signal(&pool->cond_Full); // 解锁 pthread_mutex_unlock(&pool->mutexPool); // 因为多个线程会对busyNum(正在工作的线程个数)进行操作,所以需要上锁 printf("thread %lu start working.\n", pthread_self()); pthread_mutex_lock(&pool->mutexBusy); pool->busyNum++; pthread_mutex_unlock(&pool->mutexBusy); task.func(task.arg); free(task.arg); task.arg = NULL; printf("thread %lu end working.\n", pthread_self()); pthread_mutex_lock(&pool->mutexBusy); pool->busyNum--; pthread_mutex_unlock(&pool->mutexBusy); } return NULL; } void* manager(void* arg) { ThreadPool* pool = (ThreadPool*)arg; while (!pool->shutdown) { // 每隔DETECTNUM s检测一次 sleep(DETECTNUM); // 取出线程池中任务的数量和当前线程池中线程存活的数量 pthread_mutex_lock(&pool->mutexPool); int taskNum = pool->queueSize; int liveNum = pool->liveNum; pthread_mutex_unlock(&pool->mutexPool); // 取出正在工作的线程的数量,之所以单独加锁,是因为busyNum是经常操作的数据,单独加锁而不是对整个线程池加锁,提高效率 pthread_mutex_lock(&pool->mutexBusy); int busyNum = pool->busyNum; pthread_mutex_unlock(&pool->mutexBusy); // 根据规则添加新的工作线程 // 规则:任务的个数 > 存活的线程个数 && 存活的线程数 <= 最大线程数 if (taskNum > liveNum && liveNum <= pool->maxNum) { //pool->maxNum因为是创建线程池就设定好了,不需要写操作,所以不用加锁 pthread_mutex_lock(&pool->mutexPool); int counter = 0; for (int i = 0; i < pool->maxNum && counter < CREATE_THREAD_NUM && pool->liveNum < pool->maxNum; i++) { if (pool->pworkids[i] == 0) { pthread_create(&pool->pworkids[i], NULL, working, pool); counter++; // 新增加了一个工作线程,那么存活的线程数也相应的应该加一 pool->liveNum++; } } pthread_mutex_unlock(&pool->mutexPool); } // 根据规则销毁休眠的线程 // 规则:正在工作的线程 * 2 < 存活的线程 && 存活的线程 > 最小的线程 if (busyNum * 2 < liveNum && liveNum > pool->minNum) {// pool.minNum是在创建线程池的时候就已经设置好了,不会写操作,因此无需加锁 pthread_mutex_lock(&pool->mutexPool); pool->exitNum = EXIT_THREAD_NUM; pthread_mutex_unlock(&pool->mutexPool); // 让工作线程自杀 for (int i = 0; i < EXIT_THREAD_NUM; i++) { pthread_cond_signal(&pool->cond_Empty); } } } return NULL; } void threadExit(ThreadPool* pool) { pthread_t tid = pthread_self(); for (size_t i = 0; i < pool->maxNum; i++) { if (pool->pworkids[i] == tid) { pool->pworkids[i] = 0; printf("threadExit() called, thread id: %lu exited.\n", tid); break; } } pthread_exit(NULL); }