zoukankan      html  css  js  c++  java
  • 第27课 “共享状态”及其管理者(std::future/std::shared_future)

    一. “共享状态”

    (一)“共享状态”对象

     

      1. 用于保存线程函数及其参数、返回值以及新线程状态等信息。该对象通常创建在堆上,由std::async、std::promise和std::package_task等提供(Provider),并交由future/shared_future管理。

      2. Provider将计算结果写入“共享状态”对象,而future/shared_future通过get()函数来读取该结果。“共享状态”作为异步结果的传输通道future可以从中方便地获取线程函数的返回值

      3. “共享状态”内部保存着一个引用计数,当引用计数为0时会通过delete this来销毁自身

    // CLASS TEMPLATE _Associated_state
    template <class _Ty>
    class _Associated_state { // class for managing associated synchronous state
    public:
        using _State_type = _Ty;
        using _Mydel      = _Deleter_base<_Ty>;
    
        _Associated_state(_Mydel* _Dp = nullptr)
            : _Refs(1), // non-atomic initialization
              _Exception(), _Retrieved(false), _Ready(false), _Ready_at_thread_exit(false), _Has_stored_result(false),
              _Running(false), _Deleter(_Dp) { // construct
            // TODO: _Associated_state ctor assumes _Ty is default constructible
        }
    
        virtual ~_Associated_state() noexcept { // 析构函数:注意并不会阻塞
            if (_Has_stored_result && !_Ready) { // registered for release at thread exit
                _Cond._Unregister(_Mtx);
            }
        }
    
        void _Retain() { // 增加引用计数
            _MT_INCR(_Refs);
        }
    
        void _Release() { // 减少引用计数,等于0时delete this
            if (_MT_DECR(_Refs) == 0) {
                _Delete_this();
            }
        }
    
    private:
        _Atomic_counter_t _Refs;
    
    public:
        virtual void _Wait() { // wait for signal
            unique_lock<mutex> _Lock(_Mtx);
            _Maybe_run_deferred_function(_Lock);
            while (!_Ready) {
                _Cond.wait(_Lock);
            }
        }
    
        struct _Test_ready { // _Associated_state包装类
            _Test_ready(const _Associated_state* _St) : _State(_St) { // construct from associated state
            }
    
            bool operator()() const { // 判断“共享状态”是否就绪
                return _State->_Ready != 0;
            }
            const _Associated_state* _State;
        };
    
        template <class _Rep, class _Per>
        future_status _Wait_for(const chrono::duration<_Rep, _Per>& _Rel_time) { // wait for duration
            unique_lock<mutex> _Lock(_Mtx);
            if (_Has_deferred_function()) {
                return future_status::deferred; //如果是延迟任务,调用waitfor将返回deferred,而不是future_status::ready!
            }
    
            if (_Cond.wait_for(_Lock, _Rel_time, _Test_ready(this))) {
                return future_status::ready; //返回future_status::ready
            }
    
            return future_status::timeout; //返回超时
        }
    
        template <class _Clock, class _Dur>
        future_status _Wait_until(const chrono::time_point<_Clock, _Dur>& _Abs_time) { // wait until time point
            unique_lock<mutex> _Lock(_Mtx);
            if (_Has_deferred_function()) {
                return future_status::deferred;
            }
    
            if (_Cond.wait_until(_Lock, _Abs_time, _Test_ready(this))) {
                return future_status::ready;
            }
    
            return future_status::timeout;
        }
    
        virtual _Ty& _Get_value(bool _Get_only_once) { // return the stored result or throw stored exception
            unique_lock<mutex> _Lock(_Mtx);
            if (_Get_only_once && _Retrieved) { //_Get_only_once:true表示_Get_value只能调用一次,false表示可重复调用
                _Throw_future_error(make_error_code(future_errc::future_already_retrieved));
            }
    
            if (_Exception) {
                _Rethrow_future_exception(_Exception);
            }
    
            _Retrieved = true; //标记_Get_value()函数己被调用过
            _Maybe_run_deferred_function(_Lock);
            while (!_Ready) { //如果任务结束,则不再等待。
                _Cond.wait(_Lock);
            }
    
            if (_Exception) {
                _Rethrow_future_exception(_Exception);
            }
    
            return _Result;
        }
    
        void _Set_value(const _Ty& _Val, bool _At_thread_exit) { // store a result
            unique_lock<mutex> _Lock(_Mtx);
            _Set_value_raw(_Val, &_Lock, _At_thread_exit);
        }
    
        void _Set_value_raw(const _Ty& _Val, unique_lock<mutex>* _Lock,
            bool _At_thread_exit) { // store a result while inside a locked block
            if (_Has_stored_result) {
                _Throw_future_error(make_error_code(future_errc::promise_already_satisfied));
            }
    
            _Result = _Val;
            _Do_notify(_Lock, _At_thread_exit);
        }
    
        void _Set_value(_Ty&& _Val, bool _At_thread_exit) { // store a result
            unique_lock<mutex> _Lock(_Mtx);
            _Set_value_raw(_STD forward<_Ty>(_Val), &_Lock, _At_thread_exit);
        }
    
        void _Set_value_raw(_Ty&& _Val, unique_lock<mutex>* _Lock,
            bool _At_thread_exit) { // store a result while inside a locked block
            if (_Has_stored_result) {
                _Throw_future_error(make_error_code(future_errc::promise_already_satisfied));
            }
    
            _Result = _STD forward<_Ty>(_Val);
            _Do_notify(_Lock, _At_thread_exit);
        }
    
        void _Set_value(bool _At_thread_exit) { // store a (void) result
            unique_lock<mutex> _Lock(_Mtx);
            _Set_value_raw(&_Lock, _At_thread_exit);
        }
    
        void _Set_value_raw(
            unique_lock<mutex>* _Lock, bool _At_thread_exit) { // store a (void) result while inside a locked block
            if (_Has_stored_result) {
                _Throw_future_error(make_error_code(future_errc::promise_already_satisfied));
            }
    
            _Do_notify(_Lock, _At_thread_exit);
        }
    
        void _Set_exception(exception_ptr _Exc, bool _At_thread_exit) { // store a result
            unique_lock<mutex> _Lock(_Mtx);
            _Set_exception_raw(_Exc, &_Lock, _At_thread_exit);
        }
    
        void _Set_exception_raw(exception_ptr _Exc, unique_lock<mutex>* _Lock,
            bool _At_thread_exit) { // store a result while inside a locked block
            if (_Has_stored_result) {
                _Throw_future_error(make_error_code(future_errc::promise_already_satisfied));
            }
    
            _Exception = _Exc;
            _Do_notify(_Lock, _At_thread_exit);
        }
    
        bool _Is_ready() const { // return ready status
            return _Ready != 0;
        }
    
        bool _Is_ready_at_thread_exit() const { // return ready at thread exit status
            return _Ready_at_thread_exit;
        }
    
        bool _Already_has_stored_result() const { // return presence of stored result
            return _Has_stored_result;
        }
    
        bool _Already_retrieved() const { // return retrieved status
            return _Retrieved;
        }
    
        void _Abandon() { // abandon shared state
            unique_lock<mutex> _Lock(_Mtx);
            if (!_Has_stored_result) { // queue exception
                future_error _Fut(make_error_code(future_errc::broken_promise));
                _Set_exception_raw(_STD make_exception_ptr(_Fut), &_Lock, false);
            }
        }
    
    protected:
        void _Make_ready_at_thread_exit() { // set ready status at thread exit
            if (_Ready_at_thread_exit) {
                _Ready = true;
            }
        }
    
        void _Maybe_run_deferred_function(unique_lock<mutex>& _Lock) {
            if (!_Running) { //延迟任务默认值为false,只能调用该函数后,才会变为true
                _Running = true;
                _Run_deferred_function(_Lock); //执行延迟任务
            }
        }
    
    public:
        _Ty _Result;
        exception_ptr _Exception;
        mutex _Mtx;
        condition_variable _Cond;
        bool _Retrieved; //用于标记_Get_value函数是否己被调用过,true表示己调用过,false为未调用过
        int _Ready; //是否处于就绪状态,用于唤醒等待线程。(有些任务做完线程就被置为就绪状态,而有些任务要等线程退出时才置就绪)
        bool _Ready_at_thread_exit;//是否在线程退出时才设为就绪状态
        bool _Has_stored_result; //调用_Do_notify时表示结果己计算出来,该值被置为true。
        bool _Running; //用于标识线程是否正在运行(异步任务默认值为true,延迟任务默认值为false)
    private:
        virtual bool _Has_deferred_function() const noexcept { // 被_Deferred_async_state子类override
            return false;
        }
    
        virtual void _Run_deferred_function(unique_lock<mutex>&) { // do nothing
        }
    
        virtual void _Do_notify(unique_lock<mutex>* _Lock, bool _At_thread_exit) { // 通知等待线程。
            _Has_stored_result = true; 
            if (_At_thread_exit) { //线程退出时,才唤醒等待线程
                _Cond._Register(*_Lock, &_Ready);
            } else { // 立即唤醒等待线程
                _Ready = true;
                _Cond.notify_all();
            }
        }
    
        void _Delete_this() { // delete this object
            if (_Deleter) {
                _Deleter->_Delete(this);
            } else {
                delete this; //删除自身
            }
        }
    
        _Mydel* _Deleter;
    
    public:
        _Associated_state(const _Associated_state&) = delete;  //不可复制
        _Associated_state& operator=(const _Associated_state&) = delete; //不可复制赋值
    };
    
    // CLASS TEMPLATE _Packaged_state
    template <class>
    class _Packaged_state;
    
    template <class _Ret,
        class... _ArgTypes>
    class _Packaged_state<_Ret(_ArgTypes...)>
        : public _Associated_state<_Ret> { //为packaged_task准备的”共享状态“
    public:
        using _Mybase = _Associated_state<_Ret>;
        using _Mydel  = typename _Mybase::_Mydel;
    
        template <class _Fty2>
        _Packaged_state(const _Fty2& _Fnarg) : _Fn(_Fnarg) { // construct from function object
        }
    
    #if _HAS_FUNCTION_ALLOCATOR_SUPPORT
        template <class _Fty2, class _Alloc>
        _Packaged_state(const _Fty2& _Fnarg, const _Alloc& _Al, _Mydel* _Dp)
            : _Mybase(_Dp), _Fn(allocator_arg, _Al, _Fnarg) { // construct from function object and allocator
        }
    #endif // _HAS_FUNCTION_ALLOCATOR_SUPPORT
    
        template <class _Fty2>
        _Packaged_state(_Fty2&& _Fnarg) : _Fn(_STD forward<_Fty2>(_Fnarg)) { // construct from rvalue function object
        }
    
    #if _HAS_FUNCTION_ALLOCATOR_SUPPORT
        template <class _Fty2, class _Alloc>
        _Packaged_state(_Fty2&& _Fnarg, const _Alloc& _Al, _Mydel* _Dp)
            : _Mybase(_Dp), _Fn(allocator_arg, _Al,
                                _STD forward<_Fty2>(_Fnarg)) { // construct from rvalue function object and allocator
        }
    #endif // _HAS_FUNCTION_ALLOCATOR_SUPPORT
    
        //提供给Provider使用的,provider会通过set_value_at_thread_exit调用该函数来实现线程退出时唤醒等待线程。
        void _Call_deferred(_ArgTypes... _Args) { //这类延迟函数,在线程退出时将任务设为就绪状态,才会唤醒其他线程。。
            _TRY_BEGIN
            // call function object and catch exceptions
            this->_Set_value(_Fn(_STD forward<_ArgTypes>(_Args)...), true); //执行_Fn函数,并将返回值提供给_Set_value函数。t
            _CATCH_ALL                                                      //true表示线程退出时才唤醒等待线程
            // function object threw exception; record result
            this->_Set_exception(_STD current_exception(), true);
            _CATCH_END
        }
        
        //立即调用线程函数,执行完立即唤醒等待线程。比如std::async不管是同步或异步,都是在执行完立即唤醒等待线程。
        void _Call_immediate(_ArgTypes... _Args) {
            _TRY_BEGIN
            // call function object and catch exceptions
            this->_Set_value(_Fn(_STD forward<_ArgTypes>(_Args)...), false);//立即调用函数对象,false表示任务做完立即唤醒等待线程
            _CATCH_ALL
            // function object threw exception; record result
            this->_Set_exception(_STD current_exception(), false);
            _CATCH_END
        }
    
        const function<_Ret(_ArgTypes...)>& _Get_fn() { // return stored function object
            return _Fn;
        }
    
    private:
        function<_Ret(_ArgTypes...)> _Fn;
    };
    
    // CLASS TEMPLATE _Deferred_async_state
    template <class _Rx>
    class _Deferred_async_state : public _Packaged_state<_Rx()> { //std::async创建的同步”共享状态“
    public:
        template <class _Fty2>
        _Deferred_async_state(const _Fty2& _Fnarg) : _Packaged_state<_Rx()>(_Fnarg) { // construct from function object
        }
    
        template <class _Fty2>
        _Deferred_async_state(_Fty2&& _Fnarg)
            : _Packaged_state<_Rx()>(_STD forward<_Fty2>(_Fnarg)) { // construct from rvalue function object
        }
    
    private:
        virtual bool _Has_deferred_function() const
            noexcept { // this function is considered to be deferred until it's invoked
            return !this->_Running; //如果任务己被执行过,就不在是视为延迟任务
        }
    
        virtual void _Run_deferred_function(unique_lock<mutex>& _Lock) { // run the deferred function
            _Lock.unlock();
            _Packaged_state<_Rx()>::_Call_immediate(); //注意,这里不是调用_Call::deferred()!!!
            _Lock.lock();
        }
    };
    
    // CLASS TEMPLATE _Task_async_state
    template <class _Rx>
    class _Task_async_state : public _Packaged_state<_Rx()> { //std::async创建的异步”共享状态“ 
    public:
        using _Mybase     = _Packaged_state<_Rx()>;
        using _State_type = typename _Mybase::_State_type;
    
        template <class _Fty2>
        _Task_async_state(_Fty2&& _Fnarg) : _Mybase(_STD forward<_Fty2>(_Fnarg)) { // construct from rvalue function object
            _Task = ::Concurrency::create_task([this]() { // do it now
                this->_Call_immediate();
            });
    
            this->_Running = true; //异步任务,线程一启动就处于running状态。
        }
    
        virtual ~_Task_async_state() noexcept { // destroy
            _Wait();   //异步“共享状态”对象析构时,会被阻塞!!!
        }
    
        virtual void _Wait() { // wait for completion
            _Task.wait(); //重写_Wait()。注意,这里调用的是线程级别的wait,相当于对底层线程实施一次隐式join()。
        }
    
        virtual _State_type& _Get_value(bool _Get_only_once) { // return the stored result or throw stored exception
            _Task.wait(); //异步“共享状态”对象,在调用Get_value时,也会被阻塞!
            return _Mybase::_Get_value(_Get_only_once);
        }
    
    private:
        ::Concurrency::task<void> _Task;
    };
    “共享状态”相关类的源码摘要

    (二)注意事项

      1.std::promise创建_Assoicated_state类型的共享对象。这是一种最简单的共享状态对象,只能用于保存线程函数的返回值等信息。

      2. _Package_state类型“共享状态”,除了可以保存返回值外,还用于将可调用对象包装成一个function对象一般由std::package_task创建

      3._Deffered_async_state或_Task_async_state类型,前者用于跟踪std::launch::deferred类型的异步任务,而后者用于跟踪std::launch::async类型的任务。std::async就是根据启动策略来创建这两种共享状态之一。

      4. _Task_async_state类型的“共享状态”对象,在析构时会调用wait()函数来等待任务执行完毕。从效果来看,相当于实施一次隐式join(),正如std::thread一样,C++标准委员会对这种行为也曾经存在争议。而其他所有 “共享状态”的类型都无此操作,这相当于实施一次隐式的detach()操作。

    二.期值(“共享状态”的管理器)

    (一)future/shared_future源码摘要

    // CLASS TEMPLATE _State_manager
    template <class _Ty>
    class _State_manager { //“共享状态”管理器
    public:
        _State_manager() : _Assoc_state(nullptr) { // construct with no associated asynchronous state object
            _Get_only_once = false;  //注意,默认get()函数是可多次调用的
        }
    
        _State_manager(_Associated_state<_Ty>* _New_state, bool _Get_once)
            : _Assoc_state(_New_state) { // construct with _New_state
            _Get_only_once = _Get_once;
        }
    
        _State_manager(const _State_manager& _Other, bool _Get_once = false)
            : _Assoc_state(nullptr) { // construct from _Other
            _Copy_from(_Other);
            _Get_only_once = _Get_once;
        }
    
        _State_manager(_State_manager&& _Other, bool _Get_once = false)
            : _Assoc_state(nullptr) { // construct from rvalue _Other
            _Move_from(_Other);
            _Get_only_once = _Get_once;
        }
    
        ~_State_manager() noexcept { // destroy
            if (_Assoc_state != nullptr) {
                _Assoc_state->_Release(); //_State_manager对象析构时,会同时将管理的“共享状态”的引用计数自减1
            }
        }
    
        _State_manager& operator=(const _State_manager& _Other) { // assign from _Other
            _Copy_from(_Other);
            return *this;
        }
    
        _State_manager& operator=(_State_manager&& _Other) { // assign from rvalue _Other
            _Move_from(_Other);
            return *this;
        }
    
         //检查当前的 std::future 对象是否有效,即释放与某个共享状态相关联
        _NODISCARD bool valid() const noexcept {
            return _Assoc_state != nullptr && !(_Get_only_once && _Assoc_state->_Already_retrieved());
        }
    
        //等待与当前std::future 对象相关联的共享状态的标志变为 ready.
        void wait() const { // wait for signal
            if (!valid()) {
                _Throw_future_error(make_error_code(future_errc::no_state));
            }
    
            _Assoc_state->_Wait(); //等待条件变量
        }
    
        template <class _Rep, class _Per>
        future_status wait_for(const chrono::duration<_Rep, _Per>& _Rel_time) const { // wait for duration
            if (!valid()) {
                _Throw_future_error(make_error_code(future_errc::no_state));
            }
    
            return _Assoc_state->_Wait_for(_Rel_time);
        }
    
        template <class _Clock, class _Dur>
        future_status wait_until(const chrono::time_point<_Clock, _Dur>& _Abs_time) const { // wait until time point
            if (!valid()) {
                _Throw_future_error(make_error_code(future_errc::no_state));
            }
    
            return _Assoc_state->_Wait_until(_Abs_time);
        }
    
        _Ty& _Get_value() const { // return the stored result or throw stored exception
            if (!valid()) {
                _Throw_future_error(make_error_code(future_errc::no_state));
            }
    
            return _Assoc_state->_Get_value(_Get_only_once);
        }
    
        void _Set_value(const _Ty& _Val, bool _Defer) { // store a result
            if (!valid()) {
                _Throw_future_error(make_error_code(future_errc::no_state));
            }
    
            _Assoc_state->_Set_value(_Val, _Defer);
        }
    
        void _Set_value(_Ty&& _Val, bool _Defer) { // store a result
            if (!valid()) {
                _Throw_future_error(make_error_code(future_errc::no_state));
            }
    
            _Assoc_state->_Set_value(_STD forward<_Ty>(_Val), _Defer);
        }
    
        void _Abandon() { // abandon shared state
            if (_Assoc_state) {
                _Assoc_state->_Abandon();
            }
        }
    
        void _Set_exception(exception_ptr _Exc, bool _Defer) { // store a result
            if (!valid()) {
                _Throw_future_error(make_error_code(future_errc::no_state));
            }
    
            _Assoc_state->_Set_exception(_Exc, _Defer);
        }
    
        void _Swap(_State_manager& _Other) { // exchange with _Other
            _STD swap(_Assoc_state, _Other._Assoc_state);
        }
    
        _Associated_state<_Ty>* _Ptr() const { // return pointer to stored associated asynchronous state object
            return _Assoc_state;
        }
    
        void _Copy_from(const _State_manager& _Other) { // copy stored associated asynchronous state object from _Other
            if (this != _STD addressof(_Other)) { // different, copy
                if (_Assoc_state) {
                    _Assoc_state->_Release();
                }
    
                if (_Other._Assoc_state == nullptr) {
                    _Assoc_state = nullptr;
                } else { // do the copy
                    _Other._Assoc_state->_Retain();
                    _Assoc_state   = _Other._Assoc_state;
                    _Get_only_once = _Other._Get_only_once;
                }
            }
        }
    
        void _Move_from(_State_manager& _Other) { // move stored associated asynchronous state object from _Other
            if (this != _STD addressof(_Other)) { // different, move
                if (_Assoc_state) {
                    _Assoc_state->_Release();
                }
    
                _Assoc_state        = _Other._Assoc_state;
                _Other._Assoc_state = nullptr;
                _Get_only_once      = _Other._Get_only_once;
            }
        }
    
        bool _Is_ready() const { // return ready status
            return _Assoc_state && _Assoc_state->_Is_ready();
        }
    
        bool _Is_ready_at_thread_exit() const { // return ready at thread exit status
            return _Assoc_state && _Assoc_state->_Is_ready_at_thread_exit();
        }
    
    private:
        _Associated_state<_Ty>* _Assoc_state;
        bool _Get_only_once;
    };
    
    // CLASS TEMPLATE future
    template <class _Ty>
    class shared_future;
    
    template <class _Ty>
    class future : public _State_manager<_Ty> { // class that defines a non-copyable asynchronous return object
                                                // that holds a value
        using _Mybase = _State_manager<_Ty>;
    
    public:
        future() noexcept { // construct
        }
    
        future(future&& _Other) noexcept : _Mybase(_STD move(_Other), true) { // true表示get只能被调用一次。
        }
    
        future& operator=(future&& _Right) noexcept { // assign from rvalue future object
            _Mybase::operator=(_STD move(_Right));
            return *this;
        }
    
        future(const _Mybase& _State, _Nil) : _Mybase(_State, true) { // construct from associated asynchronous state object
        }
    
        ~future() noexcept { // destroy
        }
    
        _Ty get() { // block until ready then return the stored result or
                    // throw the stored exception
            future _Local{_STD move(*this)}; //注意,移动操作,将使调用get()函数后,future将失去与“共享状态”的关联,valid()变为无效。
            return _STD move(_Local._Get_value());
        }
    
        _NODISCARD shared_future<_Ty> share() noexcept { // return state as shared_future
            return shared_future<_Ty>(_STD move(*this));
        }
    
        future(const future&) = delete;
        future& operator=(const future&) = delete;
    };
    
    // CLASS TEMPLATE shared_future
    template <class _Ty>
    class shared_future : public _State_manager<_Ty> { // class that defines a copyable asynchronous return object
                                                       // that holds a value
        using _Mybase = _State_manager<_Ty>;
    
    public:
        shared_future() noexcept { // _Mybase中将_Get_Only_once默认值设为false,表示get()可多次调用。
        }
    
        shared_future(const shared_future& _Other) noexcept : _Mybase(_Other) { //拷贝构造
        }
    
        shared_future& operator=(const shared_future& _Right) noexcept { // 拷贝赋值
    
            _Mybase::operator=(_Right);
            return *this;
        }
    
        shared_future(future<_Ty>&& _Other) noexcept
            : _Mybase(_STD forward<_Mybase>(_Other)) { // 移动构造
        }
    
        shared_future(shared_future&& _Other) noexcept
            : _Mybase(_STD move(_Other)) { // construct from rvalue shared_future object
        }
    
        shared_future& operator=(shared_future&& _Right) noexcept { // assign from shared_future rvalue object
    
            _Mybase::operator=(_STD move(_Right));
            return *this;
        }
    
        ~shared_future() noexcept { // destroy
        }
    
        const _Ty& get() const { // 阻塞,直到任务就绪。
            return this->_Get_value(); //返回值是个只读对象!
        }
    };
    【“共享状态”管理器】相关类的源码摘要

      1. std::thread对应着系统的一个底层线程,期值和std::thread对象一样,都可以视为底层线程的句柄,也就是一个线程对象,管理着对应的底层线程。

      2. 调用get()时,会阻塞等待直至任务完成。但在future中,get()函数是通过移动语义将异步结果从future中转移给get的返回值,因此该函数只能被调用一次,同时也意味着这个future对象也不可再使用(valid()为false)。而shared_future的get()函数只是简单地返回异步结果的引用所以可以多次被调用

      3. std::future是只移动类型,而std::shared_future既可移动也可复制

      (1)两者的关系,就如同unique_ptr和shared_ptr。future独占“共享状态”的所有权,而shared_future会共享所有权当调用future的share()函数时,将创建一个shared_future对象,同时原来的future将失去对“共享状态”对象的所有权,这意味着该future对象不可再使用(其valid()为false)。

      (2)shared_future可复制,多线程可共享“共享状态”对象,可用于线程间的通信。此外,在容器中保存期值时,一般使用shared_future类型而不是future类型,首先因为shared_future是可复制的,其次是因为future对象在使用get函数后将变成无效,会导致容器中保存着失效的future。

      4. future/shared_future析构时,会将其关联的“共享状态”对象的引用计数自减一次。当引用计数为0时,会同时销毁“共享状态”对象。

      5. waitfor()函数返回值的三种状态:

      (1)future_status::ready:共享状态的标志已经变为 ready,即 Provider在共享状态上设置了值或者异常。

      (2)future_status::timeout:超时,即在规定的时间内共享状态的标志没有变为ready。

      (3)future_status::deferred:共享状态包含一个 deferred函数。当使用std::async创建std::launch::deferred任务时,waitfor函数的返回值不是ready,而是deferred!!!

    (二)期值的析构行为分析

      1. std::thread对象析构时,如果仍处于可联结的状态(未显式调用join()或detach()),那么程序会终止。而future/shared_future对象在析构,有时像是隐式join(),有时像隐式detach(),有时像是二者都没有执行

      2.期值的两种析构行为:

      (1)常规行为:析构期值对象本身的成员变量并将其管理的“共享状态”对象的引用计数自减一次注意,尽管期值的构析行为有时类似于隐式join或detach,但实际上它并不对任何东西实施join或detach因此,当future配合std::thread使用时,仍需要显式调用std::thread的join或detach以防止程序被终止。这从使用std::thread的角度看也很合理,必须显式调用两者之一。

      (2)特殊行为析构期值时会阻塞线程。根据前面的分析,只有当“共享状态”对象为_Task_async_state类型时,才会阻塞线程直至异步任务结束。

      3. 结论:

      (1)只有持有_Task_async_state类型的“共享状态”的期值才会阻塞线程。因此,只有同时满足以下三个条件的期值才会表现出特殊行为,任何一个条件不满足时均只表现为常规行为。

        ①期值所指向的“共享状态”由调用std::async函数创建

        ②任务以std::launch::async策略创建。

        ③该期值是最后一个指向该共享状态的期值。对于std::future而言,由于独占“共享状态”对象的所有权,因此这一点总是成立。对于std::shared_future而言,“共享状态”对象的所有权被shared_future共享,所以只有最后一个指向该对象的shared_future才表现出这种特殊行为。

      (2)对于那些由std::async以std::launch::deferred创建的任务,在最后一个期值析构时仍没调用get或wait,则被推迟的任务将不会再有机会运行了。因为最后一个期值将会把“共享状态”对象销毁。

    【编程实验】std::future和std::shared_future

    #include <iostream>
    #include <thread>
    #include <future>
    #include <chrono>
    #include <mutex>
    #include <vector>
    
    using namespace std;
    
    std::mutex g_mtx;
    
    int func(int tm, std::promise<int>& pr)
    {
        auto dura = std::chrono::milliseconds(tm * 100);
        std::this_thread::sleep_for(dura);
        pr.set_value(tm);
    
        return tm;
    }
    
    bool check_prime(int x) //判断是否是质数
    {
        for (int i = 2; i < x; ++i){
            if (x % i == 0)
                return false;
        }
    
        return true;
    }
    
    int calcValue(int x, int y)
    {
        std::lock_guard<std::mutex> lck(g_mtx);
        cout<<"calcValue: x = " << x << ", y = " << y << endl;
    
        return x * x + y * y;
    }
    
    int main()
    {
        //1.future和shared_future的使用
        //shared_future:用于让多个线程同时等待信号
        std::promise<void> ready_promise, t1_ready_promise, t2_ready_promise;
        std::shared_future<void> ready_future = ready_promise.get_future();
    
        std::chrono::time_point<std::chrono::high_resolution_clock> start;
    
        //注意,ready_future按值捕获
        auto fun1 = [&, ready_future]()->std::chrono::duration<double, std::milli>
        {
            t1_ready_promise.set_value();
            ready_future.wait();  //等待,接受来自main()的信号
            return std::chrono::high_resolution_clock::now() - start; //返回start后多少时间才收到信号
        };
    
        auto fun2 = [&, ready_future]()->std::chrono::duration<double, std::milli>
        {
            t2_ready_promise.set_value();
            ready_future.wait(); //等待,接受来自main()的信号
            return std::chrono::high_resolution_clock::now() - start; //返回start后多少时间才收到信号
        };
    
        auto result1 = std::async(std::launch::async, fun1);
        auto result2 = std::async(std::launch::async, fun2);
    
        //等待子线程启动完成:确保两个子线程都己经运行,以防止主线程先发通知后子线程才运行。
        //两个t1/t2_ready_promise通过set_value通知主线程,两个子线程己全部启动,并进入等待主线程的通知。
        t1_ready_promise.get_future().wait(); 
        t2_ready_promise.get_future().wait(); 
    
        start = std::chrono::high_resolution_clock::now();
    
        //向子线程发送信号
        ready_promise.set_value();
        std::cout << "Thread 1 received the signal " << result1.get().count() << " ms after start." << endl;
        std::cout << "Thread 2 received the signal " << result2.get().count() << " ms after start." << endl;
    
        //1.2 用容器保存std::shared_future。
        vector<std::shared_future<int>> vec;
        auto fut1 = std::async(std::launch::async, [](int a, int b) {return a + b;}, 2, 3);
        vec.push_back(fut1.share());
        std::cout << "The shared_future result is " <<vec[0].get() << endl;
    
        ////2. wait_for的返回值类型
        auto fut2 = std::async(std::launch::deferred, calcValue, 1, 2); //延时函数,同步任务
        auto fut3 = std::async(std::launch::async, []() {std::this_thread::sleep_for(1s); });//异步任务
    
        if (fut2.wait_for(0s) == std::future_status::deferred) {
            cout <<"fut2 is a deferred task!" << endl;
        }
    
        cout << "waiting";
        while (fut3.wait_for(20ms) != std::future_status::ready) { // std::future_status::timeout
            cout << ".";
        }
        cout << endl;
    
        //3. 期值的析构行为
        //3.1常规行为:期值析构时,并不对任何东西实施join或detach
        {
            std::promise<int> pr;
            std::future<int> fut = pr.get_future();
    
            std::thread th(func, 10, std::ref(pr)); 
            th.detach(); //必须确保在th析构前,调用join或detach
    
            auto res = fut.get();
    
            cout << res << endl;
        }
    
        {
            std::packaged_task<int(int, int)> pt(calcValue);
            auto fut = pt.get_future();
            std::thread th(std::move(pt), 1, 2); //pt是只移动类型
            cout << fut2.get() << endl;
            th.join(); //th析构前,必须调用join()或detach(),可放入get之前或之后。
        }
    
        //3.2 特殊行为
        {
            //由async创建的std::launch::async类型的任务,会在fut离开作用域阻塞等待任何完成(类似于隐式join)
            auto fut1 = std::async(std::launch::async, calcValue, 3, 4);
     
            //std::launch::deferred类型的任务,在fut2离开作用域时并不会阻塞等待(普通析构行为!)。
            auto fut2 = std::async(std::launch::deferred, check_prime, 123456);
    
            //如果不调用fut2.get(),任务永远不会被执行。
            if (fut2.get()) {//阻塞等待子线程结束,并获取任务的结果。
                cout << "123456 is prime." << endl;
            }
            else {
                cout << "123456 is not prime." << endl;
            }
        } 
    
        return 0;
    }
    /*输出结果
    Thread 1 received the signal 0.0199 ms after start.
    Thread 2 received the signal 0.0139 ms after start.
    The shared_future result is 5
    fut2 is a deferred task!
    waiting...............................................
    10
    calcValue: x = 1, y = 2
    5
    calcValue: x = 1, y = 2
    123456 is not prime.
    calcValue: x = 3, y = 4
    */
  • 相关阅读:
    java——testNG——工作复习——xml详解
    转义符,re模块,rangdom随机数模块,
    nyoj 814又见拦截导弹
    Soj题目分类
    Xcode 性能优化
    python使用pip安装模块出现ReadTimeoutError: HTTPSConnectionPool的解决方法(pip使用豆瓣源)
    浅谈模拟退火
    43-正则表达式(1)
    命令行上的narrowing(随着输入逐步减少备选项)工具
    有效决策,这么做就对了!
  • 原文地址:https://www.cnblogs.com/5iedu/p/11743514.html
Copyright © 2011-2022 走看看