#include "threadpool.h" #include #include #include template ThreadPool::ThreadPool(size_t min, size_t max) { do { // 实例化任务队列 m_taskQ = new TaskQueue(); if (m_taskQ == nullptr) { std::cout << "malloc m_taskQ failed." << std::endl; } pworkids = new pthread_t[max]; if (pworkids == nullptr) { std::cout << "malloc pworkids failed." << std::endl; break; } // 对申请的内存进行初始化 memset(pworkids, 0, sizeof(pthread_t) * max); this->minNum = min; this->maxNum = max; this->busyNum = 0; this->liveNum = min; // 初始化线程池时存活的线程数和创建线程池时传参的需要创建的最小线程数相等 this->exitNum = 0; if (pthread_mutex_init(&this->mutexPool, NULL) != 0 || pthread_cond_init(&this->cond_Empty, NULL) != 0 || pthread_cond_init(&this->cond_Full, NULL) != 0) { std::cout << "mutex and condition variable init failed." << std::endl; break; } this->shutdown = false; // 创建管理者线程 pthread_create(&this->managerID, NULL, manager, this); // 创建工作线程 for (size_t i = 0; i < min; i++) { pthread_create(&this->pworkids[i], NULL, working, this); } return; } while (0); // 如何初始化失败,则执行释放资源操作 if (pworkids) delete[] pworkids; if (m_taskQ) delete m_taskQ; } template ThreadPool::~ThreadPool() { // 关闭线程池,将关闭的标识 shutdown 置为 true this->shutdown = true; // 阻塞回收管理者线程 pthread_join(this->managerID, NULL); std::cout << "manager thread destroyed." << std::endl; // 唤醒阻塞的消费者线程,当shutdown=true的时候,所有的线程就会自动退出 for (size_t i = 0; i < this->liveNum; i++) { pthread_cond_signal(&this->cond_Empty); } sleep(1); std::cout << "working thread destroyed." << std::endl; // 释放堆内存 if (this->m_taskQ) { delete this->m_taskQ; this->m_taskQ = NULL; } std::cout << "free pool->taskQ successed." << std::endl; if (this->pworkids) { delete this->pworkids; this->pworkids = NULL; } std::cout << "free pool->pworkids successed." << std::endl; // 释放锁和条件变量 pthread_mutex_destroy(&this->mutexPool); pthread_cond_destroy(&this->cond_Empty); pthread_cond_destroy(&this->cond_Full); } template void ThreadPool::addTask(Task task) { // 任务队列的类内部维护了一把互斥锁,此处不需要再加锁 while (this->m_taskQ->taskNum() > 100 && !this->shutdown) { // 阻塞生产者线程 pthread_cond_wait(&this->cond_Full, &this->mutexPool); } if (this->shutdown) { return; } // 添加任务到任务队列 this->m_taskQ->addTask(task); //唤醒消费者工作线程处理任务 pthread_cond_signal(&this->cond_Empty); } template size_t ThreadPool::getWorkingThreads() { pthread_mutex_lock(&this->mutexPool); size_t busyNum = this->busyNum; pthread_mutex_unlock(&this->mutexPool); return busyNum; } template size_t ThreadPool::getALiveThreads() { pthread_mutex_lock(&this->mutexPool); size_t aliveNum = this->liveNum; pthread_mutex_unlock(&this->mutexPool); return aliveNum; } template size_t ThreadPool::getTaskNum() { // 因为队列中维护了一把互斥锁,所以此处不需要再加锁 return this->m_taskQ->taskNum(); } template void* ThreadPool::working(void* arg) { ThreadPool* pool = static_cast(arg); while (true) { pthread_mutex_lock(&pool->mutexPool); // 判断当前任务队列是否为空 if (pool->m_taskQ->taskNum() == 0 && !pool->shutdown) { // 阻塞工作线程 pthread_cond_wait(&pool->cond_Empty, &pool->mutexPool); // 判断是否要销毁线程 if (pool->exitNum > 0) { pool->exitNum--; if (pool->liveNum > pool->minNum) { pool->liveNum--; // 在需要退出的线程退出之前,需要解开互斥锁,否则会产生死锁风险 pthread_mutex_unlock(&pool->mutexPool); // 退出当前线程,线程退出后应该把当前线程在线程数组中的pid置为0 //pthread_exit(NULL); // 直接退出没有把线程数组中当前线程的线程id置为0,那后续无法新创建线程 // 所以应该调用一个线程退出函数去把退出线程的线程id置为0 pool->threadExit(); } } } // 在向下执行前仍需要判断以下线程池是否已经关闭 if (pool->shutdown) { // 解锁,防止多线程执行的死锁风险 pthread_mutex_unlock(&pool->mutexPool); pool->liveNum--; pool->threadExit(); } // 从任务队列中获取任务 Task task = pool->m_taskQ->getTask(); pool->busyNum++; // 唤醒生产者线程执行向队列中添加任务 pthread_cond_signal(&pool->cond_Full); // 解锁 pthread_mutex_unlock(&pool->mutexPool); // 因为多个线程会对busyNum(正在工作的线程个数)进行操作,所以需要上锁 std::cout << "thread id: " << std::to_string(pthread_self()) << " start working." << std::endl; task.func(task.arg); // TODO: // 此处释放的参数由于是void*类型只占四个字节,如果未指定类型delete只能释放四个字节 // 所以delete的时候未指定释放的类型会有风险,超出四个字节的内存就无法释放。 // 解决办法是需要修改Task的结构体或者类为模板类型,让类型传递过来 delete task.arg; task.arg = nullptr; std::cout << "thread id: " << std::to_string(pthread_self()) << " end working." << std::endl; pthread_mutex_lock(&pool->mutexPool); pool->busyNum--; pthread_mutex_unlock(&pool->mutexPool); } return nullptr; } template void* ThreadPool::manager(void* arg) { ThreadPool* pool = static_cast(arg); while (!pool->shutdown) { // 每隔DETECTNUM s检测一次 sleep(DETECTNUM); // 取出线程池中任务的数量和当前线程池中线程存活的数量 pthread_mutex_lock(&pool->mutexPool); size_t taskNum = pool->m_taskQ->taskNum(); size_t liveNum = pool->liveNum; // 当前线程正在工作的线程数量 size_t busyNum = pool->busyNum; pthread_mutex_unlock(&pool->mutexPool); // 根据规则添加新的工作线程 // 规则:任务的个数 > 存活的线程个数 && 存活的线程数 <= 最大线程数 if (taskNum > liveNum && liveNum <= pool->maxNum) { //pool->maxNum因为是创建线程池就设定好了,不需要写操作,所以不用加锁 pthread_mutex_lock(&pool->mutexPool); size_t counter = 0; for (size_t i = 0; i < pool->maxNum && counter < CREATE_THREAD_NUM && pool->liveNum < pool->maxNum; i++) { if (pool->pworkids[i] == 0) { pthread_create(&pool->pworkids[i], NULL, working, pool); counter++; // 新增加了一个工作线程,那么存活的线程数也相应的应该加一 pool->liveNum++; } } pthread_mutex_unlock(&pool->mutexPool); } // 根据规则销毁休眠的线程 // 规则:正在工作的线程 * 2 < 存活的线程 && 存活的线程 > 最小的线程 if (busyNum * 2 < liveNum && liveNum > pool->minNum) {// pool.minNum是在创建线程池的时候就已经设置好了,不会写操作,因此无需加锁 pthread_mutex_lock(&pool->mutexPool); pool->exitNum = EXIT_THREAD_NUM; pthread_mutex_unlock(&pool->mutexPool); // 让工作线程自杀 for (size_t i = 0; i < EXIT_THREAD_NUM; i++) { pthread_cond_signal(&pool->cond_Empty); } } } return nullptr; } template void ThreadPool::threadExit() { pthread_t tid = pthread_self(); for (size_t i = 0; i < this->maxNum; i++) { if (this->pworkids[i] == tid) { this->pworkids[i] = 0; std::cout << __FUNCTION__ << " called, " << std::to_string(tid) << "has exited." << std::endl; break; } } pthread_exit(NULL); }