zoukankan      html  css  js  c++  java
  • 并发编程入门(二):分析Boost对 互斥量和条件变量的封装及实现生产者消费者问题

    请阅读上篇文章《并发编程实战: POSIX 使用互斥量和条件变量实现生产者/消费者问题》。当然不阅读亦不影响本篇文章的阅读大笑

    Boost的互斥量,条件变量做了很好的封装,因此比“原生的”POSIX mutex,condition variables好用。然后我们会通过分析boost相关源码看一下boost linux是如何对pthread_mutex_t和pthread_cond_t进行的封装。

    首先看一下condition_variable_any的具体实现,代码路径:/boost/thread/pthread/condition_variable.hpp

    class condition_variable_any
    {
        pthread_mutex_t internal_mutex;
        pthread_cond_t cond;
    
        condition_variable_any(condition_variable_any&);
        condition_variable_any& operator=(condition_variable_any&);
    
    public:
        condition_variable_any()
        {
            int const res=pthread_mutex_init(&internal_mutex,NULL);
            if(res)
            {
                boost::throw_exception(thread_resource_error());
            }
            int const res2=pthread_cond_init(&cond,NULL);
            if(res2)
            {
                BOOST_VERIFY(!pthread_mutex_destroy(&internal_mutex));
                boost::throw_exception(thread_resource_error());
            }
        }
        ~condition_variable_any()
        {
            BOOST_VERIFY(!pthread_mutex_destroy(&internal_mutex));
            BOOST_VERIFY(!pthread_cond_destroy(&cond));
        }
     
    condition_variable_any的构造函数是对于内部使用的mutex和cond的初始化,对应的,析构函数则是这些资源的回收。

    BOOST_VERIFY的实现:

    #undef BOOST_VERIFY  
    #if defined(BOOST_DISABLE_ASSERTS) || ( !defined(BOOST_ENABLE_ASSERT_HANDLER) && defined(NDEBUG) )  
    // 在任何情况下,expr一定会被求值。  
    #define BOOST_VERIFY(expr) ((void)(expr))  
    #else  
    #define BOOST_VERIFY(expr) BOOST_ASSERT(expr)  
    #endif  
    因此不同于assert在Release版的被优化掉不同,我们可以放心的使用BOOST_VERITY,因此它的表达式肯定会被求值,而不用担心assert的side effect。
    接下来看一下condition_variable_any的核心实现:wait

       template<typename lock_type>
        void wait(lock_type& m)
        {
            int res=0;
            {
                thread_cv_detail::lock_on_exit<lock_type> guard;
                detail::interruption_checker check_for_interruption(&internal_mutex,&cond);
                guard.activate(m);
                res=pthread_cond_wait(&cond,&internal_mutex);
                this_thread::interruption_point();
            }
            if(res)
            {
                boost::throw_exception(condition_error());
            }
        }
    
    首先看一下lock_on_exit:

    namespace thread_cv_detail
    {
        template<typename MutexType>
        struct lock_on_exit
        {
            MutexType* m;
    
            lock_on_exit():
                m(0)
            {}
    
            void activate(MutexType& m_)
            {
                m_.unlock();
                m=&m_;
            }
            ~lock_on_exit()
            {
                if(m)
                {
                    m->lock();
                }
           }
        };
    }
    
    代码很简单,实现了在调用activate时将传入的lock解锁,在该变量生命期结束时将guard的lock加锁。

    接下来的detail::interruption_checker check_for_interruption(&internal_mutex,&cond);是什么意思呢?From /boost/thread/pthread/thread_data.hpp

    class interruption_checker
    {
        thread_data_base* const thread_info;
        pthread_mutex_t* m;
        bool set;
    
        void check_for_interruption()
        {
            if(thread_info->interrupt_requested)
            {
                thread_info->interrupt_requested=false;
                throw thread_interrupted();
            }
        }
    
        void operator=(interruption_checker&);
    public:
        explicit interruption_checker(pthread_mutex_t* cond_mutex,pthread_cond_t* cond):
            thread_info(detail::get_current_thread_data()),m(cond_mutex),
            set(thread_info && thread_info->interrupt_enabled)
        {
            if(set)
            {
                lock_guard<mutex> guard(thread_info->data_mutex);
                check_for_interruption();
                thread_info->cond_mutex=cond_mutex;
                thread_info->current_cond=cond;
                BOOST_VERIFY(!pthread_mutex_lock(m));
            }
            else
            {
                BOOST_VERIFY(!pthread_mutex_lock(m));
            }
        }
        ~interruption_checker()
        {
            if(set)
            {
                BOOST_VERIFY(!pthread_mutex_unlock(m));
                lock_guard<mutex> guard(thread_info->data_mutex);
                thread_info->cond_mutex=NULL;
                thread_info->current_cond=NULL;
            }
            else
            {
                BOOST_VERIFY(!pthread_mutex_unlock(m));
            }
        }
    
    代码面前,毫无隐藏。那句话就是此时如果有interrupt,那么就interrupt吧。否则,lock传入的mutex,也是为了res=pthread_cond_wait(&cond,&internal_mutex);做准备。

    关于线程的中断点,可以移步《【Boost】boost库中thread多线程详解5——谈谈线程中断》。

    对于boost::mutex,大家可以使用同样的方法去解读boost的实现,相对于condition variable,mutex的实现更加直观。代码路径:/boost/thread/pthread/mutex.hpp。

    namespace boost
    {
        class mutex
        {
        private:
            mutex(mutex const&);
            mutex& operator=(mutex const&);
            pthread_mutex_t m;
        public:
            mutex()
            {
                int const res=pthread_mutex_init(&m,NULL);
                if(res)
                {
                    boost::throw_exception(thread_resource_error());
                }
            }
            ~mutex()
            {
                BOOST_VERIFY(!pthread_mutex_destroy(&m));
            }
    
            void lock()
            {
                int const res=pthread_mutex_lock(&m);
                if(res)
                {
                    boost::throw_exception(lock_error(res));
                }
            }
    
            void unlock()
            {
                BOOST_VERIFY(!pthread_mutex_unlock(&m));
            }
    
            bool try_lock()
            {
                int const res=pthread_mutex_trylock(&m);
                if(res && (res!=EBUSY))
                {
                    boost::throw_exception(lock_error(res));
                }
    
                return !res;
            }
    
            typedef pthread_mutex_t* native_handle_type;
            native_handle_type native_handle()
            {
                return &m;
            }
    
            typedef unique_lock<mutex> scoped_lock;
            typedef detail::try_lock_wrapper<mutex> scoped_try_lock;
        }; 
    }

    boost对于pthread_mutex_t和pthread_cond_t的封装,方便了开发者的使用的资源的安全有效管理。当然,在不同的公司,可能也都有类似的封装,学习boost的源码,无疑可以加深我们的理解。在某些特定的场合,我们也可以学习boost的封装方法,简化我们的日常开发。

    最后,奉上简单的生产者、消费者的boost的实现,和前文《并发编程实战: POSIX 使用互斥量和条件变量实现生产者/消费者问题》相比,我们可以看到boost简化了mutex和condition variable的使用。以下代码引自《Boost程序库完全开发指南》:

    #include <boost/thread.hpp>
    #include <stack>
    using std::stack;
    using std::cout;
    class buffer
    {
    private:
        boost::mutex mu; // 条件变量需要配合互斥量
        boost::condition_variable_any cond_put; // 生产者写入
        boost::condition_variable_any cond_get; // 消费者读走
    
        stack<int> stk;
        int un_read;
        int capacity;
    
        bool is_full()
        {
            return un_read == capacity;
        }
        bool is_empty()
        {
            return 0 == un_read;
        }
    
    public:
        buffer(size_t capacity) : un_read(0), capacity(capacity)
        {}
        void put(int x)
        {
    
            boost::mutex::scoped_lock lock(mu); // 这里是读锁的门闩类
    
            while (is_full())
            {
                cout << "full waiting..." << endl;
                cond_put.wait(mu); // line:51
            }
            stk.push(x);
            ++un_read;
    
            cond_get.notify_one();
        }
        void get(int *x)
        {
            boost::mutex::scoped_lock lock(mu); // 这里是读锁的门闩类
    
            while (is_empty())
            {
                cout << "empty waiting..." << endl;
                cond_get.wait(mu);
            }
            *x = stk.top();
            stk.pop();
            --un_read;
    
            cond_put.notify_one(); // 通知 51line可以写入了
        }
    };
    
    buffer buf(5); 
    
    void producer(int n)
    {
        for (int i = 0; i < n; ++i)
        {
            cout << "put : " << i << endl;
            buf.put(i);
        }
    }
    
    void consumer(int n)
    {
        int x;
        for (int i = 0; i < n; ++i)
        {
            buf.get(&x);
            cout << "get : " << x << endl;
        }
    }
    
    int main()
    {
        boost::thread t1(producer, 20);
        boost::thread t2(consumer, 10);
        boost::thread t3(consumer, 10);
    
        t1.join();
        t2.join();
        t3.join();
    
        return 0;
    }
    
    最后说一句,condition_variable_any == condition, from /boost/thread/condition.hpp

    namespace boost
    {
        typedef condition_variable_any condition;
    }
    

  • 相关阅读:
    Bootstrap-CSS:按钮
    Bootstrap-CSS:表单
    质检总局-版权局
    java实现第二届蓝桥杯地铁换乘(C++)
    java实现第二届蓝桥杯地铁换乘(C++)
    java实现第二届蓝桥杯地铁换乘(C++)
    java实现第二届蓝桥杯地铁换乘(C++)
    java实现第二届蓝桥杯地铁换乘(C++)
    java实现第二届蓝桥杯最小公倍数(c++)
    java实现第二届蓝桥杯最小公倍数(c++)
  • 原文地址:https://www.cnblogs.com/anzhsoft/p/3602963.html
Copyright © 2011-2022 走看看