zoukankan      html  css  js  c++  java
  • [zz]muduo源码阅读之Thread和ThreadPool

    [source_address] https://blog.dujiong.net/2016/07/17/muduo-6/

    muduo源码阅读之Thread和ThreadPool

    在muduo的one loop per thread + thread pool模型中,线程和线程池应该是其中最基础也是最重要的两个组件了。所以,本文深入代码,学习Thread和ThreadPool两个类的结构和实现。

    Thread类

    __thread关键字

    学习Thread class之前,先了解一个关键字的用法:__thread。
    __thread是GCC内置的线程局部存储设施。它的实现非常高效,比Pthread库中的pthread_key_t(muduo中ThreadLocal)快很多。__thread变量是表示每个线程有一份独立实体,各个线程的变量值互不干扰。__thread只能修饰POD类型,不能修饰class类型,因为无法自动调用构造函数和析构函数。
    Thread类的封装用到了命名空间CurrentThread,这个空间中定义了和线程相关的一些独立属性。

    namespace CurrentThread
    {
        __thread int t_cachedTid = 0;
        __thread char t_tidString[32];
        __thread int t_tidStringLength = 6;
        __thread const char* t_threadName = "unknown";
    }
    

    其中,t_cachedTid表示线程的真实id,Pthread库中提供了pthread_self()获取当前线程的标识,类型为pthread_t。但是,pthread_t不一定是数值类型,也可能是一个结构体,这带来了一些问题。
    (1)无法打印输出pthread_t,因为不知道其确切类型。
    (2)无法比较pthread_t大小或计算hash值。
    (3)pthread_t值只在进程内有意义,与操作系统的任务调度之间无法建立有效关系,Pthread库只能保证在同一进程之内,同一时刻的各个线程的id不同。
    所以,muduo采用gettid()系统调用的返回值作为线程id,muduo中将操作封装为gettid()函数。但是,我们知道,调用系统调用开销比较大,所以,muduo中采用__thread变量t_cachedTid来存储,在线程第一次使用tid时通过系统调用获得,存储在t_cachedTid中,以后使用时不再需要系统调用了。
    t_tidString[32]:用string类型表示tid,以便输出日志。
    t_tidStringLength:string类型tid的长度。
    t_threadName:线程的名字。

    相关的数据结构

    在Thread的实现中,还用到了两个数据结构,一个位ThreadData,用来辅助调用线程执行的函数。另一个为ThreadNameInitializer,为线程的创建做环境准备。其中用到了pthread_atfork(NULL,NULL,&afterfork)。该函数的原型为
    int pthread_atfork(void (*prepare)(void), void (*parent)(void), void (*child)(void));
    这是一个跟进程创建有关的函数,为fork的调用做准备和调用后子进程父进程的初始化。prepare函数在调用fork前执行,parent在调用fork后的父进程中执行,child在调用fork后的子进程中执行。
    当然,在实际应用中,多线程不要调用fork(),否则会出现一些问题。因为fork智能克隆当前线程的thread of control,却不克隆其他线程。fork()之后,除了当前线程之外,其他线程都消失了。这样,会出现很多问题。比如,如果复制了一个lock的mutex,却没有复制unlock的线程,那么在给mutex加锁时就会出现死锁。

    Thread类分析

    首先来看Thread类的数据成员和构造函数。

    class Thread : boost::noncopyable
    {
        typedef boost::function<void ()> ThreadFunc;
        ...
        private:
            bool started_;
            bool joined_;
            pthread_t pthreadId_;
            boost::shared_ptr<pid_t> tid_;
            ThreadFunc func_;
            string name_;
    
            static AtomicInt32 numCreated_;
    };
    Thread::Thread(ThreadFunc&& func, const string& n)
        : started_(false), 
          joined_(false),
          pthreadId_(0),
          tid_(new pid_t(0)),
          func_(func),
          name_(n)
    {
          setDefaultName();
    }
    

    其中有两处需要说明一下,首先是shared_ptr<pid> tid_,可能有人会有疑问:为什么这里要用shared_ptr包装pid?
    原因是tid_所属的对象Thread在主线程(A)中创建,而tid_需要在新创建的线程B中进行赋值操作,如果tid使用裸指针的方式传递给线程(B),那么线程A中Thread对象析构(下文)销毁后,线程B持有的就是一个野指针,所以,在Thread对象中将以shared_ptr包装。
    然后是numCreated_,是一个静态变量,类型为AtomicInt32,原子类型,用来表示第几次创建线程实例,在记录日志时可用记录为:“线程名+numCreated_”。

    接下来看Thread的一些接口函数。

    void Thread::start()
    {
        started = true;
        detail::ThreadData* data = new detail::ThreadData(func_, name_, tid_);
        if(pthread_create(&pthreadId_, NULL, &detail::startThread, data));
        {
            started_ = false;
            delete data;
            LOG_SYSFATAL << "Failed in pthread_create";
        }
    }
    

    Thread::start()将调用pthread_create()创建新线程,detail::startThread()是新线程的入口函数,data是新线程执行的辅助结构体。detail::startThread()调用data->runInThread()执行线程逻辑(func_)。

    int Thread::join()
    {
        assert(started_);
        assert(!joined_);
        joined_ = true;
        return pthread_join(pthreadId_, NULL);
    }
    
    Thread::~Thread()
    {
        if(started_ && !joined_)
        {
            pthread_detach(pthreadId_);
        }
    }
    

    Thread析构的时候没有销毁持有的Pthreads句柄(pthread_t),也就是说Thread的析构不会等待线程结束。如果Thread对象的生命期长于线程,然后通过Thread::join()来等待线程结束并释放线程资源。如果Thread对象的生命期短于线程,那么析构时会自动detach线程,避免了资源泄露。

    ThreadPool类

    ThreadPool(线程池)本质上是一个生产者-消费者的模型,在实际中主要完成计算任务。在muduo线程池中有一个存放工作线程的容器ptr_vector,相当于消费者;有一个存放任务的队列deque。
    任务队列是有界的,类似于BoundedBlockingQueue,实现时需要两个条件变量。
    以下是ThreadPool的数据成员:

    class ThreadPool : boost::noncopyable
    {
        typedef boost::function<void ()> Task;
        private:
            MutexLock mutex_;
            Condition notEmpty_;
            Condition notFull_;
            string name_;
            Task threadInitCallback_;
            boost::ptr_vector<muduo::Thread> threads_;
            std::deque<Task> queue_;
            size_t maxQueueSize_;
            bool running_;
    }
    

    其中threadInitCallback_可由setThreadInitCallback(const Task& cb)设置,设置回调函数,每次在执行任务前先调用。在线程池开始运行之前,需要先设置任务队列的大小(调用setMaxQueueSize()),因为运行线程池时,线程会从任务队列取任务。
    接下来是ThreadPool的一些接口函数。

    void ThreadPool::start(int numThreads)
    {
        assert(threads_.empty());
        running_ = true;
        threads_.reserve(numThreads);
        for (int i = 0; i < numThreads; ++i)
          {
            char id[32];
            snprintf(id, sizeof id, "%d", i+1);
            threads_.push_back(new muduo::Thread(
                  boost::bind(&ThreadPool::runInThread, this), name_+id));
            threads_[i].start();
          }
        if(numThreads == 0 && threadInitCallback_)
        {
            threadInitCallback_();
        }
    }
    

    void ThreadPool::start(int numThreads)开启线程池,按照线程数量numThreads_创建工作线程,线程函数为ThreadPool::runInThread()。

    void ThreadPool::runInThread()
    {
        try
        {
            if(threadInitCallback_)
            {
                threadInitCallback_();
            }
            while(running_)
            {
                Task task(take());
                if(task)
                {
                    task();
                }
            }
        }
        catch(const Exception& ex)
        {
            ...
        }
    
    }
    

    如果设置了threadInitCallback_,则进行执行任务前的一些初始化操作。然后从任务队列中取任务执行,有可能阻塞,当任务队列为空时。

    ThreadPool::Task ThreadPool::take()
    {
        MutexLockGuard lock(mutex_);
        while(queue_.empty() && running_)
        {
            notEmpty_.wait();
        }
        Task task;
        if(!queue_.empty())
        {
            task = queue_.front();
            queue_.pop_front();
            if(maxQueueSize_ > 0)
            {
                notFull_.notify();
            }
        }
        return task;
    }
    

    多线程从消息队列中取任务的时候,需要加锁保护。等到队列非空信号,就取任务。取出之后,便告知任务队列已经非满,可以继续添加任务。

    void ThreadPool::run(const Task& task)
    {
        if(threads_.empty())
        {
            task();
        }
        else
        {
            MutexLockGuard lock(mutex_);
            while(isFull())
            {
                notFull_.wait();
            }
            assert(!isFull());
            queue_.push_back(task);
            notEmpty_.notfy();
        }
    }
    

    如果ThreadPool没有子线程(set和start操作在run之前),就在主线程中执行该task,否则,将任务加入到队列,并通知线程从中取task,如果队列已满,便等待。

    ThreadPool::~ThreadPool()
    {
        if(running_)
        {
            stop();
        }
    }
    void ThreadPool::stop()
    {
        {
            MutexLockGuard lock(mutex_);
            running_ = false;
            notEmpty_.notifyAll();
        }
        for_each(threads_.begin(),threads_.end(),boost::bind(&muduo::Thread::join, _1));
    }
    

    最后是ThreadPool的析构函数,在其中调用stop(),唤醒所有等待的线程,然后对线程池中的每一个线程执行join()。

    总结

    以上就是muduo中Thread和ThreadPool类的学习,有很多源码,有点啰嗦。但是,在muduo的one loop per thread + thread pool模型中,Thread和ThreadPool是很重要的组件,所以需要深入地掌握。

     
  • 相关阅读:
    伪类和伪元素的区别, 总结的很好, 直接看结论.
    进制闲谈
    遇到的问题&思考
    PHP中include引用导致不能再次相对引用文件的一个小问题
    ECharts饼图试玩
    不该迷茫的时候迷茫
    [5]火车票接口整理
    [4]xlongwei工具类
    [3]天行新闻
    [2]新闻
  • 原文地址:https://www.cnblogs.com/ym65536/p/8284205.html
Copyright © 2011-2022 走看看