简单线程池的实现

  1. 云栖社区>
  2. 博客列表>
  3. 正文

简单线程池的实现

feilengcui008 2015-03-20 23:11:00 浏览409 评论1

摘要: 通常在需要大量线程连接或者需要执行异步任务的时候,为了避免线程多次创建的开销,我们可以事先创建一定数量的线程,组成一个线程池。由threadpool统一管理线程的生命期以及任务的添加。

通常在需要大量线程连接或者需要执行异步任务的时候,为了避免线程多次创建的开销,我们可以事先创建一定数量的线程,组成一个线程池。由threadpool统一管理线程的生命期以及任务的添加。


线程池通常由四部分构成

  • 线程池本身结构作为管理器
  • 任务队列
  • 工作线程
  • 往工作线程中添加任务的接口

下面是一个Linux下的简单线程池的实现与演示(为了测试方便有些地方直接将pthread_t转换成了int打印,另外一些地方使用了gettid):

//gcc Feature Test Macros,为了syscall
//可以参照http://www.gnu.org/software/libc/manual/html_node/Feature-Test-Macros.html
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/syscall.h>
#include <unistd.h>

//任务回调函数形式
typedef void *(*callback)(void *args);

//任务结构,包含一个回调成员,参数成员,任务指针next组成一个任务队列
typedef struct _task task;
struct _task{
    callback cb;
    void *args;
    struct _task *next;
};

//线程池成员
typedef struct _pool pool;
struct _pool{
    int thread_number;     //工作线程数量限制
    int task_queue_size;   //当前任务队列中的数量
    int max_queue_size;    //最大任务数量
    int running;           
    pthread_t *pt;         //保存工作线程pthread_t for join
    task *task_queue_head; //任务队列
    pthread_mutex_t queue_mutex;  //队列锁
    pthread_cond_t queue_cond;    //条件变量
};

//工作线程执行的函数
void *routine(void *args);

//线程池的初始化
void pool_init(pool *p, int thread_number, int max_queue_size)
{
    p->thread_number = thread_number;
    p->max_queue_size = max_queue_size;
    p->task_queue_size = 0;
    p->task_queue_head = NULL;
    p->pt = (pthread_t *)malloc(sizeof(pthread_t)*p->thread_number);
    if(!p->pt){
        perror("malloc pthread_t array failed");
        exit(EXIT_FAILURE);
    }
    pthread_mutex_init(&p->queue_mutex,NULL);
    pthread_cond_init(&p->queue_cond,NULL);
    for(int i = 0; i < p->thread_number; i++)
    {
        pthread_create (&(p->pt[i]), NULL, routine, (void *)p);
    }
    p->running = 1;
}

//线程池的清理
void pool_clean(pool *p)
{
    if(!p->running)
        return;
    p->running = 0;

    //tell all threads we are exiting
    pthread_cond_broadcast(&p->queue_cond);

    //wait and join all threads
    for (int i = 0; i < p->thread_number; ++i)
    {
        pthread_join(p->pt[i],NULL);
    }
    free(p->pt);

    //free task queue or if needed we can persistent the remaining task
    task *temp;
    while((temp=p->task_queue_head)!=NULL){
        p->task_queue_head = p->task_queue_head->next;
        free(temp);
    }

    pthread_mutex_destroy(&p->queue_mutex);
    pthread_cond_destroy(&p->queue_cond);

    free(p);
    p = NULL;

}


//内部使用
int _pool_add_task(pool *p, task *t)
{
    int ret = 0;
    pthread_mutex_lock(&p->queue_mutex);
    if(p->task_queue_size>=p->max_queue_size){
        pthread_mutex_unlock(&p->queue_mutex);
        //for max queue size error
        ret = 1;
        return ret;
    }
    task *temp = p->task_queue_head;
    if(temp!=NULL){
        while(temp->next!=NULL){
            temp = temp->next;
        }
        temp->next = t;
    }else{
        p->task_queue_head = t;
    }
    p->task_queue_size++;
    pthread_mutex_unlock(&p->queue_mutex);
    return ret;
}

//添加任务接口
int pool_add_task(pool *p, callback cb, void *data)
{
    int ret = 0;
    task *t = (task *)malloc(sizeof(task));
    t->cb = cb;
    t->args = data;
    t->next = NULL;
    if((ret=_pool_add_task(p,t))>0){
        fprintf(stderr,"add wroker failed,reaching max size of task queue\n");
        return ret;
    }
    return ret;
}

//线程routine
void *routine(void *args)
{
    pool *p = (pool *)args;
    task *t;
    fprintf(stdout,"thread_id:%ld\n",syscall(SYS_gettid));
    while(1){

        //将加锁放在条件等待之前可以避免每次添加任务将其他线程白白唤醒
        //而且能保证接受destroy broadcast退出时不会竞争
        pthread_mutex_lock(&p->queue_mutex);
        //wait
        while(p->task_queue_size==0 && p->running){
            pthread_cond_wait(&p->queue_cond,&p->queue_mutex);
        }

        //wake up because pool_destroy
        if(!p->running){
            pthread_mutex_unlock(&p->queue_mutex);
            fprintf(stdout,"thread:%d will exit pool_destroy\n",(int)pthread_self());
            pthread_exit(NULL);
        }

        //wake up to get a task
        t = p->task_queue_head;
        p->task_queue_head = p->task_queue_head->next;
        p->task_queue_size--;
        pthread_mutex_unlock(&p->queue_mutex);

        //when we do the task,release mutex for other threads
        t->cb(t->args);

    }

    pthread_exit(NULL);
}


//测试用的任务回调函数
void *callbacktest(void *args)
{
    fprintf(stdout,"from thread:%d---passed parameter:%d\n",(int)pthread_self(),(int)(*(int *)(args)));
}

int main()
{
    pool *p = (pool *)malloc(sizeof(pool));
    if(p==NULL){
        fprintf(stderr,"malloc pool failed\n");
    }
    pool_init(p,4,10);
    int args[10];
    for (int i=0;i<11;i++){
        args[i] = i;
    }
    for (int i=0;i<11;i++){
        pool_add_task(p,&callbacktest,&args[i]);
    }
    sleep(10);
    pool_clean(p);
    return 0;
}


上面是一个简单的线程池,限制了线程的数量,任务队列的最大任务数量。我们还可以加一些其他的高级特性,比如在退出时将未完成的队列任务持久化,使用多个队列,给任务产生随机唯一id从而实现cancel,任务延迟等等,不过这些更多的是队列的特性,在一些成熟的产品如beanstalkd等中实现得挺不错了。

【云栖快讯】阿里云栖开发者沙龙(Java技术专场)火热来袭!快来报名参与吧!  详情请点击

网友评论

1F
子木朋鸟飞

pool_add_task中添加任务到队列如果失败,应该free(task)

feilengcui008
文章66篇 | 关注4
关注
是一款简单高效的电子邮件发送服务,它构建在可靠稳定的阿里云基础之上,帮助您快速、精准地实现事... 查看详情
是解决用户结构化数据搜索需求的托管服务,支持数据结构、搜索排序、数据处理自由定制。 为您的网... 查看详情
是一种简单易用的云计算资源管理和自动化运维服务。用户通过模板描述多个云计算资源的依赖关系、配... 查看详情
双12

双12