"); //-->
本文使用的源码地址:
https://github.com/SCljh/thread_pool
线程池描述
池式结构
在计算机体系结构中有许多池式结构:内存池、数据库连接池、请求池、消息队列、
对象池等等。
池式结构解决的主要问题为缓冲问题,起到的是缓冲区的作用。
线程池
通过使用线程池,我们可以有效降低多线程操作中任务申请和释放产生的性能消耗。
特别是当我们每个线程的任务处理比较快时,系统大部分性能消耗都花在了
pthread_create以及释放线程的过程中。
那既然是这样的话,何不在程序开始运行阶段提前创建好一堆线程,
等我们需要用的时候只要去这一堆线程中领一个线程,用完了再放回去,
等程序运行结束时统一释放这一堆线程呢?按照这个想法,线程池出现了。
线程池解决的问题
解决任务处理
阻塞IO
解决线程创建于销毁的成本问题
管理线程
线程池应用之一:日志存储
在服务器保存日志至磁盘上时,性能往往压在磁盘读写上,
而引入线程池利用异步解耦可以解决磁盘读写性能问题。
线程池的主要作用:异步解耦
说了那么多线程池的优点,那接下来要做的就是手撕这诱人的玩意了。
朴实无华但不枯燥的代码(以c++为例)
本文主要讲解的是c++线程池的实现,C语言实现其实思想和c++是一致的,
具体的代码可见文章开头的链接。
线程池中比较关键的东西
若想自己编写一个线程池框架,那么可以先关注线程池中比较关键的东西:
工作队列
任务队列
线程池的池
pthread_create中的回调函数
为什么说这些东西比较关键?因为这“大四元”基本上支撑起了整个线程池的框架。
而线程池的框架如下所示:
————————————————

工作队列
worker队列,首先要有worker才有队列,我们首先定义worker结构体:
可想而知,worker中要有create_pthread函数的id参数,
还需要有控制每一个worker live or die的标志terminate,
我们最好再设置一个标志表示这个worker是否在工作。
最后,我们要知道这个worker隶属于那个线程池(至于为什么下文将介绍)
struct NWORKER{
        pthread_t threadid;		//线程id
        bool terminate;			//是否需要结束该worker的标志
        int isWorking;			//该worker是否在工作
        ThreadPool *pool;		//隶属于的线程池
    }任务队列struct NJOB{
        void (*func)(void *arg);     //任务函数
        void *user_data;			 //函数参数
    };线程池本池
对于一个线程池,任务队列和工作队列已经是必不可少的东西了,
那线程池还有需要哪些东西辅助它以达到它该有的功能呢?
一说到线程,那处理好线程同步就是一个绕不开的话题,
那在线程池中我们需要处理的临界资源有哪些呢?
想想我们工作队列中的每个worker都在等待一个任务队列看其是否有任务到来,
所以很容易得出结论我们必须要在线程池中实现两把锁:
一把是用来控制对任务队列操作的互斥锁,
另一把是当任务队列有新任务时唤醒worker的条件锁。
有了这两把锁,线程池中再加点必要的一些数字以及对线程池操作的函数,
那么这个类就写完了。实现代码如下:
class ThreadPool{
private:
    struct NWORKER{
        pthread_t threadid;
        bool terminate;
        int isWorking;
        ThreadPool *pool;
    } *m_workers;
    struct NJOB{
        void (*func)(void *arg);     //任务函数
        void *user_data;
    };
public:
    //线程池初始化
    //numWorkers:线程数量
    ThreadPool(int numWorkers, int max_jobs);
    //销毁线程池
    ~ThreadPool();
    //面向用户的添加任务
    int pushJob(void (*func)(void *data), void *arg, int len);
private:
    //向线程池中添加任务
    bool _addJob(NJOB* job);
    //回调函数
    static void* _run(void *arg);
    void _threadLoop(void *arg);
private:
    std::list<NJOB*> m_jobs_list;
    int m_max_jobs;		//任务队列中的最大任务数
    int m_sum_thread;	//worker总数
    int m_free_thread;		//空闲worker数
    pthread_cond_t m_jobs_cond;      //线程条件等待
    pthread_mutex_t m_jobs_mutex;  //为任务加锁防止一个任务
    被两个线程执行等其他情况
};可以看到我们做了一些必要的封装,只给用户提供了构造函数、
析构函数以及添加任务的函数。这也是一个基本的线程池框架必要的接口。
回调函数
static?
根据上方代码可以看见,回调函数为static函数。
我可不想在我使用使用回调函数的时候自动给我加上*this参数。
首先回调函数是每个线程创建之后就开始执行的函数,
该函数作为pthread_create的第三个参数传入。我们来看看pthread_create的函数原型:
int pthread_create(pthread_t *tidp,const pthread_attr_t *attr, void *(*start_rtn)(void*),void *arg);
注意到,此处的我们传入的回调函数必须是接受一个void*参数,
且返回类型为void*的函数。如果我们将回调函数写成线程池的普通成员函数,
那么c++会在这个函数参数前默认加上一个*this参数,
**这也是为什么我们能在成员函数中使用当前对象中的一些属性。
**然而就是这个原因,若我们传入的回调函数指针为类的成员函数,
那c++编译器会破坏我们的函数结构(因为给我们加了一个形参),
导致pthread_create的第三个参数不符合要求而报错:

看吧,编译器不让我们用non-static的成员函数作为回调函数传入pthread_create中。
其实在c++中,大多数回调函数都有这个要求。
那为什么static就可以呢?
这是因为static函数为类的静态函数,当类的成员函数被static修饰后,
调用该函数将不会默认传递*this指针,
这也是为什么static成员函数中不能使用对象的非static属性:
你*this指针都没传我上哪去找你的对象?
函数本身
在运行回调函数的时候,我们又想用对象里的东西(比如锁),
编译器又不让我们用,那怎么办?别忘了虽然static函数没有*this指针,
但它却可以有一个*void的参数啊。有了这个*void,我们还怕少一个*this指针?
我们可以先写一个static函数,将需要的对象指针通过形参传到这个函数里,
再在这个函数中通过这个对象调用成员函数的方法,就能使用这个对象的成员属性了。
就像这样:
//run为static函数
void* ThreadPool::_run(void *arg) {
    NWORKER *worker = (NWORKER *)arg;
    worker->pool->_threadLoop(arg);
}
//threadLoop为普通成员函数
void ThreadPool::_threadLoop(void *arg) {
    //在这里就能直接用当前ThreadPool对象的东西了
}
至于threadLoop的实现,由于线程是要一直存在的,
一个while(true)的循环肯定少不了了。这个循环中具体做什么呢:
不断检查任务队列中是否有job:
如果有,则取出这个job,并将该job从任务队列中删除,且执行job中的func函数。
如果没有,调用pthread_cond_wait函数等待job到来时被唤醒。
若当前worker的terminate为真,则退出循环结束线程。
注意在对job操作前别忘了加锁,函数实现如下:
void ThreadPool::_threadLoop(void *arg) {
    NWORKER *worker = (NWORKER*)arg;
    while (1){
        //线程只有两个状态:执行\等待
        //查看任务队列前先获取锁
        pthread_mutex_lock(&m_jobs_mutex);
        //当前没有任务
        while (m_jobs_list.size() == 0) {
        	//检查worker是否需要结束生命
            if (worker->terminate) break;
            //条件等待直到被唤醒
            pthread_cond_wait(&m_jobs_cond,&m_jobs_mutex);
        }
        //检查worker是否需要结束生命
        if (worker->terminate){
            pthread_mutex_unlock(&m_jobs_mutex);
            break;
        }
        //获取到job后将该job从任务队列移出,免得其他worker过来重复做这个任务
        struct NJOB *job = m_jobs_list.front();
        m_jobs_list.pop_front();
		//对任务队列的操作结束,释放锁
        pthread_mutex_unlock(&m_jobs_mutex);
        m_free_thread--;
        worker->isWorking = true;
        //执行job中的func
        job->func(job->user_data);
        worker->isWorking = false;
        free(job->user_data);
        free(job);
    }
    free(worker);
    pthread_exit(NULL);
}                        
原文链接:https://blog.csdn.net/ACMer_L/article/details/107578636
                     
                  
                        
————————————————
                        
               
                        
                        
原文链接:https://blog.csdn.net/ACMer_L/article/details/107578636
*博客内容为网友个人发布,仅代表博主个人观点,如有侵权请联系工作人员删除。