zoukankan      html  css  js  c++  java
  • C++线程池

    简介
    线程池(thread pool):一种线程的使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
    线程池的组成
    1、线程池管理器
      创建一定数量的线程,启动线程,调配任务,管理着线程池。
      本篇线程池目前只需要启动(start()),停止方法(stop()),及任务添加方法(addTask).
      start()创建一定数量的线程池,进行线程循环.
      stop()停止所有线程循环,回收所有资源.
      addTask()添加任务.
    2、工作线程
      线程池中线程,在线程池中等待并执行分配的任务.
      本篇选用条件变量实现等待与通知机制.
    3、任务接口,
      添加任务的接口,以供工作线程调度任务的执行。
    4、任务队列
      用于存放没有处理的任务。提供一种缓冲机制
      同时任务队列具有调度功能,高优先级的任务放在任务队列前面;本篇选用priority_queue 与pair的结合用作任务优先队列的结构.

    代码实现:

    ThreadPool.hpp:

    #ifndef _THREAD_POOL_H_
    #define _THREAD_POOL_H_
    
    #include <thread>
    #include <mutex>
    #include <atomic>
    #include <condition_variable>
    #include <functional>
    #include <vector>
    #include <queue>
    
    class ThreadPool
    {
    public:
        using Task = std::function<void()>;
    
        explicit ThreadPool(int num) : _thread_num(num), _is_running(false)
        {}
    
        ~ThreadPool()
        {
            if (_is_running)
                stop();
        }
    
        void start()
        {
            _is_running = true;
    
            // start threads
            for (int i = 0; i < _thread_num; i++)
                _threads.emplace_back(std::thread(&ThreadPool::work, this));
        }
    
        void stop()
        {
            {
                // stop thread pool, should notify all threads to wake
                std::unique_lock<std::mutex> lk(_mtx);
                _is_running = false;
                _cond.notify_all(); // must do this to avoid thread block
            }
    
            // terminate every thread job
            for (std::thread& t : _threads)
            {
                if (t.joinable())
                    t.join();
            }
        }
    
        void appendTask(const Task& task)
        {
            if (_is_running)
            {
                std::unique_lock<std::mutex> lk(_mtx);
                _tasks.push(task);
                _cond.notify_one(); // wake a thread to to the task
            }
        }
    
    private:
        void work()
        {
            printf("begin work thread: %d
    ", std::this_thread::get_id());
    
            // every thread will compete to pick up task from the queue to do the task
            while (_is_running)
            {
                Task task;
                {
                    std::unique_lock<std::mutex> lk(_mtx);
                    if (!_tasks.empty())
                    {
                        // if tasks not empty, 
                        // must finish the task whether thread pool is running or not
                        task = _tasks.front();
                        _tasks.pop(); // remove the task
                    }
                    else if (_is_running && _tasks.empty())
                        _cond.wait(lk);
                }
    
                if (task)
                    task(); // do the task
            }
    
            printf("end work thread: %d
    ", std::this_thread::get_id());
        }
    
    public:
        // disable copy and assign construct
        ThreadPool(const ThreadPool&) = delete;
        ThreadPool& operator=(const ThreadPool& other) = delete;
    
    private:
        bool _is_running; // thread pool manager status
        std::mutex _mtx;
        std::condition_variable _cond;
        int _thread_num;
        std::vector<std::thread> _threads;
        std::queue<Task> _tasks;
    };
    
    
    #endif // !_THREAD_POOL_H_
    View Code

    main.cpp

    #include "stdafx.h"
    #include <iostream>
    #include <chrono>
    #include "ThreadPool.hpp"
    
    void fun1()
    {
        std::cout << "working in thread " << std::this_thread::get_id() << std::endl;
    }
    
    void fun2(int x)
    {
        std::cout << "task " << x << " working in thread " << std::this_thread::get_id() << std::endl;
    }
    
    int main(int argc, char* argv[])
    {
        ThreadPool thread_pool(3);
        thread_pool.start();
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    
        for (int i = 0; i < 6; i++)
        {
            //thread_pool.appendTask(fun1);
            thread_pool.appendTask(std::bind(fun2, i));
            //std::this_thread::sleep_for(std::chrono::milliseconds(500));
        }
    
        thread_pool.stop();
    
        getchar();
        return 0;
    }
    View Code

    线程池偏底层实现原理

    #ifndef _THREADPOOL_H
    #define _THREADPOOL_H
    
    #define LL_ADD(item,list) do  
    {
    item->prev = NULL;
    item->next = list;
    if (list != nullptr)
    {
        list->prev = item;
    }
    
    }while (0)
    
    #define LL_REMOVE(item,list) do  
    {
    if (item->prev != nullptr)
    {
        item->prev - next = item->next;
    }
    if (item->next != nullptr)
    {
        item->next->prev = item->prev;
    }
    if (item == item)
    {
        list = item->next;
    }
    }while (0)
    
    struct NWORKER
    {
        pthread_t thread;
        struct NMANAGER *pool;
        int terminate;
    
        struct NWORKER *prev;
        struct NWORKER *next;
    };
    
    struct NJOB
    {
        void(*func)(struct NJOB *job);
        void *user_data;
    
        struct NJOB *prev;
        struct NJOB *next;
    };
    
    struct NMANAGER
    {
        struct Nworder *workers;
        struct NJOB *jobs;
    
        pthread_cond_t jobs_cond;
        pthread_mutex_t jobs_mutex;
    
    
    };
    
    typedef struct NMANGER nThreadPool;
    
    static void *nThreadCallback(void *arg)
    {
        struct NWORKER *worker = (struct NWORKER*)arg;
        while (1)
        {
            pthread_mutex_lock(&worker->pool->jobs_mutex);
            while (worker->pool->jobs == NULL)
            {
                if (worker->terminate)break;
                pthread_cond_wait(work->pool->jobs_cond, worker->pool->jobs_mutex);
            }
    
            if (worker->terminate)
            {
                pthread_mutex_unlock(&worker->pool->jobs_mutex);
                break;
            }
    
    
            struct NJOB *job = worker->pool->jobs;
            LL_REMOVE(job, worker->pool->jobs);
    
            pthread_mutex_unlock(&worker->pool->jobs_mutex);
    
            job->func(job->user_data);
        }
    
        free(worker);
        pthread_exit(NULL);
    }
    
    int nThreadPoolCreate(nThreadPool *pool, int numWorkers)
    {
        if (numWorkers < 1) numWorkers = 1;
        memset(pool, 0, sizeof(nThreadPool));
    
        pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
        memcpy(&pool->jobs_cond, &blank_cond, sizeof(pthread_cond_t));
    
        pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
        memcpy(&pool->jobs_mutex, &blank_mutex, sizeof(pthread_mutex_t));
    
        for (int i = 0; i < numWorkers; i++)
        {
            struct NWORKER *worker = (struct NWORKER*)malloc(sizeof(NWORKER));
            if (worker != nullptr)
            {
                perror("malloc");
                return -2;
            }
            memset(worker, 0, sizeof(struct NWORKER));
            worker->pool = pool;
    
            int ret = pthread_create(worker->thread, NULL, nThreadCallback, worker);
            if (ret)
            {
                perror("pthread_create");
                free(worker);
                retrun - 3;
            }
    
            LL_ADD(worker, pool->workers);
        }
    }
    
    int nThreadPoolDestroy(nThreadPool *pool)
    {
        struct NWORKER* worker = NULL;
        for (worker = pool->workers; worker != NULL; worker = pool->next)
        {
            worker->teminate = 1;
        }
    
        pthread_mutex_lock(&pool->jobs_mutex);
        pthread_cond_broadcast(pool->jobs_cond);
        pthread_mutex_unlock(&pool->jobs_mutex);
    }
    
    void nThreadPoolPush(nThreadPool *pool, struct NJOB *job)
    {
        pthread_mutex_lock(&pool->jobs_mutex);
    
        LL_ADD(job, pool->jobs);
        pthread_cond_signal(&pool->jobs_cond);
    
        pthread_mutex_unlock(&pool->jobs_mutex);
    }
    #endif
    View Code
    111
  • 相关阅读:
    CentOS如何挂载U盘(待更新)
    CentOS6.8启动Tomcat无法访问
    CentOS7安装后连不上网络无法使用yum
    Android Studio 3.0找不到Android Device Monitor
    初识 ‘测试左移 测试右移’
    利用coverage工具进行Python代码覆盖率测试
    Charles抓包过滤的四种方式
    postman中添加cookie信息
    初始Activity启动模式
    MySQL数据库报错:Too many connection
  • 原文地址:https://www.cnblogs.com/zwj-199306231519/p/13648877.html
Copyright © 2011-2022 走看看