zoukankan      html  css  js  c++  java
  • 第28课 “共享状态”提供者(std::promise/std::package_task)

    一. std::promise和std::package_task

    (一)共享状态、提供者和管理者

     

    // CLASS TEMPLATE _Promise
    template <class _Ty>
    class _Promise { // class that implements core of promise
    public:
        _Promise(_Associated_state<_Ty>* _State_ptr)
            : _State(_State_ptr, false), _Future_retrieved(false) { // construct from associated asynchronous state object
        }
    
        _Promise(_Promise&& _Other)
            : _State(_STD move(_Other._State)),
              _Future_retrieved(_Other._Future_retrieved) { // construct from rvalue _Promise object
        }
    
        _Promise& operator=(_Promise&& _Other) { // assign from rvalue _Promise object
    
            _State            = _STD move(_Other._State);
            _Future_retrieved = _Other._Future_retrieved;
            return *this;
        }
    
        ~_Promise() noexcept { // destroy
        }
    
        void _Swap(_Promise& _Other) { // exchange with _Other
            _State._Swap(_Other._State);
            _STD swap(_Future_retrieved, _Other._Future_retrieved);
        }
    
        const _State_manager<_Ty>& _Get_state() const { // return reference to associated asynchronous state object
            return _State;
        }
        _State_manager<_Ty>& _Get_state() { // return reference to associated asynchronous state object
            return _State;
        }
    
        _State_manager<_Ty>& _Get_state_for_set() { // return reference to associated asynchronous state object, or
                                                    // throw exception if not valid for setting state
            if (!_State.valid()) {
                _Throw_future_error(make_error_code(future_errc::no_state));
            }
    
            return _State;
        }
    
        _State_manager<_Ty>& _Get_state_for_future() { // return reference to associated asynchronous state object, or
                                                       // throw exception if not valid for retrieving future
            if (!_State.valid()) {
                _Throw_future_error(make_error_code(future_errc::no_state));
            }
    
            if (_Future_retrieved) {
                _Throw_future_error(make_error_code(future_errc::future_already_retrieved));
            }
    
            _Future_retrieved = true;
            return _State;
        }
    
        bool _Is_valid() const noexcept { // return status
            return _State.valid();
        }
    
        bool _Is_ready() const { // return ready status
            return _State._Is_ready();
        }
    
        bool _Is_ready_at_thread_exit() const { // return ready at thread exit status
            return _State._Is_ready_at_thread_exit();
        }
    
        _Promise(const _Promise&) = delete;
        _Promise& operator=(const _Promise&) = delete;
    
    private:
        _State_manager<_Ty> _State;
        bool _Future_retrieved;
    };
    
    // CLASS TEMPLATE promise
    template <class _Ty>
    class promise { // class that defines an asynchronous provider that holds a value
    public:
        promise() : _MyPromise(new _Associated_state<_Ty>) { // construct
        }
    
        template <class _Alloc>
        promise(allocator_arg_t, const _Alloc& _Al)
            : _MyPromise(_Make_associated_state<_Ty>(_Al)) { // construct with allocator
        }
    
        promise(promise&& _Other) noexcept
            : _MyPromise(_STD move(_Other._MyPromise)) { // construct from rvalue promise object
        }
    
        promise& operator=(promise&& _Other) noexcept { // assign from rvalue promise object
            promise(_STD move(_Other)).swap(*this);
            return *this;
        }
    
        ~promise() noexcept { // destroy
            if (_MyPromise._Is_valid() && !_MyPromise._Is_ready()
                && !_MyPromise._Is_ready_at_thread_exit()) { // exception if destroyed before function object returns
                future_error _Fut(make_error_code(future_errc::broken_promise));
                _MyPromise._Get_state()._Set_exception(_STD make_exception_ptr(_Fut), false);
            }
        }
    
        void swap(promise& _Other) noexcept { // exchange with _Other
            _MyPromise._Swap(_Other._MyPromise);
        }
    
        _NODISCARD future<_Ty> get_future() { // return a future object that shares the associated
                                              // asynchronous state
            return future<_Ty>(_MyPromise._Get_state_for_future(), _Nil());
        }
    
        void set_value(const _Ty& _Val) { // store result
            _MyPromise._Get_state_for_set()._Set_value(_Val, false);
        }
    
        void set_value_at_thread_exit(const _Ty& _Val) { // store result and block until thread exit
            _MyPromise._Get_state_for_set()._Set_value(_Val, true);
        }
    
        void set_value(_Ty&& _Val) { // store result
            _MyPromise._Get_state_for_set()._Set_value(_STD forward<_Ty>(_Val), false);
        }
    
        void set_value_at_thread_exit(_Ty&& _Val) { // store result and block until thread exit
            _MyPromise._Get_state_for_set()._Set_value(_STD forward<_Ty>(_Val), true);
        }
    
        void set_exception(exception_ptr _Exc) { // store result
            _MyPromise._Get_state_for_set()._Set_exception(_Exc, false);
        }
    
        void set_exception_at_thread_exit(exception_ptr _Exc) { // store result and block until thread exit
            _MyPromise._Get_state_for_set()._Set_exception(_Exc, true);
        }
    
        promise(const promise&) = delete;
        promise& operator=(const promise&) = delete;
    
    private:
        _Promise<_Ty> _MyPromise;
    };
    
    template <class _Ret, class... _ArgTypes>
    class packaged_task<_Ret(_ArgTypes...)> { // class that defines an asynchronous provider that returns the
                                              // result of a call to a function object
    public:
        using _Ptype              = typename _P_arg_type<_Ret>::type;
        using _MyPromiseType      = _Promise<_Ptype>;
        using _MyStateManagerType = _State_manager<_Ptype>;
        using _MyStateType        = _Packaged_state<_Ret(_ArgTypes...)>;
    
        packaged_task() noexcept : _MyPromise(0) { // construct
        }
    
        template <class _Fty2, class = enable_if_t<!is_same_v<_Remove_cvref_t<_Fty2>, packaged_task>>>
        explicit packaged_task(_Fty2&& _Fnarg)
            : _MyPromise(new _MyStateType(_STD forward<_Fty2>(_Fnarg))) { // construct from rvalue function object
        }
    
        packaged_task(packaged_task&& _Other) noexcept
            : _MyPromise(_STD move(_Other._MyPromise)) { // construct from rvalue packaged_task object
        }
    
        packaged_task& operator=(packaged_task&& _Other) noexcept { // assign from rvalue packaged_task object
    
            _MyPromise = _STD move(_Other._MyPromise);
            return *this;
        }
    
    #if _HAS_FUNCTION_ALLOCATOR_SUPPORT
        template <class _Fty2, class _Alloc, class = enable_if_t<!is_same_v<_Remove_cvref_t<_Fty2>, packaged_task>>>
        packaged_task(allocator_arg_t, const _Alloc& _Al, _Fty2&& _Fnarg)
            : _MyPromise(_Make_packaged_state<_MyStateType>(
                  _STD forward<_Fty2>(_Fnarg), _Al)) { // construct from rvalue function object and allocator
        }
    #endif // _HAS_FUNCTION_ALLOCATOR_SUPPORT
    
        ~packaged_task() noexcept { // destroy
            _MyPromise._Get_state()._Abandon();
        }
    
        void swap(packaged_task& _Other) noexcept { // exchange with _Other
            _STD swap(_MyPromise, _Other._MyPromise);
        }
    
        _NODISCARD bool valid() const noexcept { // return status
            return _MyPromise._Is_valid();
        }
    
        _NODISCARD future<_Ret> get_future() { // return a future object that shares the associated
                                               // asynchronous state
            return future<_Ret>(_MyPromise._Get_state_for_future(), _Nil());
        }
    
        void operator()(_ArgTypes... _Args) { // call the function object
            if (_MyPromise._Is_ready()) {
                _Throw_future_error(make_error_code(future_errc::promise_already_satisfied));
            }
    
            _MyStateManagerType& _State = _MyPromise._Get_state_for_set();
            _MyStateType* _Ptr          = static_cast<_MyStateType*>(_State._Ptr());
            _Ptr->_Call_immediate(_STD forward<_ArgTypes>(_Args)...);
        }
    
        void make_ready_at_thread_exit(_ArgTypes... _Args) { // call the function object and block until thread exit
            if (_MyPromise._Is_ready()) {
                _Throw_future_error(make_error_code(future_errc::promise_already_satisfied));
            }
    
            _MyStateManagerType& _State = _MyPromise._Get_state_for_set();
            if (_State._Ptr()->_Already_has_stored_result()) {
                _Throw_future_error(make_error_code(future_errc::promise_already_satisfied));
            }
    
            _MyStateType* _Ptr = static_cast<_MyStateType*>(_State._Ptr());
            _Ptr->_Call_deferred(_STD forward<_ArgTypes>(_Args)...);
        }
    
        void reset() { // reset to newly constructed state
            _MyStateManagerType& _State         = _MyPromise._Get_state_for_set();
            _MyStateType* _MyState              = static_cast<_MyStateType*>(_State._Ptr());
            function<_Ret(_ArgTypes...)> _Fnarg = _MyState->_Get_fn();
            _MyPromiseType _New_promise(new _MyStateType(_Fnarg));
            _MyPromise._Get_state()._Abandon();
            _MyPromise._Swap(_New_promise);
        }
    
        packaged_task(const packaged_task&) = delete;
        packaged_task& operator=(const packaged_task&) = delete;
    
    private:
        _MyPromiseType _MyPromise;
    };
    【std::promise/std::package_task源码摘要】

      1.“共享状态”作为异步结果的传输通道,由std::async、std::promise和std::package_task等提供(Provider),并交由future/shared_future管理。Provider将计算结果写入“共享状态”对象,而future/shared_future通过get()函数来读取该结果。

      2. std::promise用于包装一个值,将数据和future绑定起来,方便线程赋值。而std::package_task用来包装一个可调用对象,将函数与future绑定以便异步调用。

      3. std::async、std::promise和std::package_task都是“共享状态”对象的创建者,它们创建“共享状态”类型有所不同。

      (1)std::async:创建_Deferred_async_state和_Task_async_state类型的共享状态。

      (2)std::promise:创建_Associated_state类型的“共享状态”。这种类型比较简单,内部只能保存某个类型的值(如返回值)。

      (3)std::package_task:创建_Package_state类型的“共享状态”,这种类型内部是通过std::function来可以包装可调用对象的

      4. std::promise和std::package_task都只有移动语义而没有拷贝语义

    (二)std::promise类

      1. 用来保存某一类型的值,也可以用来保存线程函数的返回值,该值可被future读取。它为线程同步提供了一种手段。

      2. 可以通过 get_future 来获取future 对象,该对象与promise通过“共享状态”这个通道进行异步结果传输。std::promise通常在某个时间点通过设置一个值或异常对象, future通过调用get()来获取这个结果。

      3. set_value_at_thread_exit函数:设置共享状态的值,但不会将共享状态的标志设置为ready。当有当线程退出时,该标志位才设置为true,同时唤醒所有阻塞在future的get()函数的线程。

      4. 如果销毁std::promise时未设置值,则会存入一个异常。

    (三)std::package_task类

      1. std::package_task 用来包装可调用对象,其本身也是一个可调用对象(因为重载了operator()(Args…args)函数。它可以作为线程函数传递给std::thread,或传给需要可调用对象的另一个函数,或者干脆直接调用。这与std::function类似,但std::package_task会将其包装的可调用对象执行结果(返回值)保存起来,并传递给了future对象

      2. 通过get_future()返回一个与“共享状态”相关联的future对象。其他线程可以通过std::package_task对象在“共享状态”上设置某个值或者异常。

      3. make_ready_at_thread_exit(Arg...args):该函数会调用被包装的任务,并向任务传递参数,类似于std::package_task的operator()成员函数,但不同的是make_read_at_thread_exit并不会立即设置“共享状态”的ready标志,而是在线程退出时才设置它

      4. reset()函数会重置“共享状态”,但是保留了之前被包装的任务。它使得package_task可以被重复使用,这点与std::promise一次性使用不同。

      5. std::package_task对象一般与std::thread配合使用,而不是std::async。如果要使用std::async运行任务,就没有理由去创建std::package_task对象。因为std::async调用时,内部会创建一个基类为_Packaged_state类 “共享状态”的子类对象,而std::package_task也会创建_Packaged_state类的对象。可见std::async能够在调用任务执行之前就做到std::package_task能做到的任何事情,也可以避免重复创建“共享状态”对象。

    【编程实验】初探std::promise和std::pack_task

    #include <iostream>
    #include <future>
    #include <thread>
    #include <chrono>
    #include <queue>
    
    using namespace std;
    
    void func(std::promise<int>& pr, int param)
    {
        int res = param * 10;
        pr.set_value_at_thread_exit(res); //线程退出时,设置需要输出的值
    }
    
    //计算阶乘
    int factorial(int n)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    
        if (n == 1)
            return 1;
    
        return n * factorial(n - 1);
    }
    
    void get_result(std::future<int>& fut) //获取结果
    {
        while (fut.wait_for(std::chrono::milliseconds(20)) == std::future_status::timeout) {
            std::cout << ".";
        }
        std::cout << std::endl;
        std::cout << "the factorial result is " << fut.get() << std::endl;
    }
    
    int main()
    {
        //1. std::promise/std::future配合使用
    
        //1.1 主线程等待子线程的结果
        std::promise<int> pr1;
        std::future<int>  fut1 = pr1.get_future();
    
        std::thread t1(func, std::ref(pr1), 5); 
        t1.join();  //等待t1线程退出
    
        std::cout << "The func output: " << fut1.get() << std::endl;
        //1.2 子线程等待主线程的计算结果
        std::promise<int> pr2;
        std::future<int> fut2 = pr2.get_future(); //创建通道
    
        std::thread t2([](std::future<int>& fut) {
            int res = fut.get();
            cout <<"thread id: "<<std::this_thread::get_id() << " get result " << res << endl;
        }, std::ref(fut2));
    
        std::this_thread::sleep_for(std::chrono::milliseconds(1500));
    
        pr2.set_value(100);
        t2.join();
    
        //2. std::package_task与std::future配合使用
        //2.1 直接将package_task作为函数对象使用
        std::packaged_task<double(int, int)> task1([](int a, int b) {
            return std::pow(a, b);
            });
    
        std::future<double> res = task1.get_future();
    
        task1(2, 9);
    
        std::cout << "task_lambda: " << res.get() << endl;
    
        //2.2 将std::package_task作为任务传递给std::thread线程
        std::packaged_task<int(int)> task2(factorial);
        std::future<int> fut3 = task2.get_future();
    
        std::thread t3(std::ref(task2), 7); //t3线程计算7的阶乘
        std::thread t4(get_result, std::ref(fut3));
        t3.join();
        t4.join();
    
        task2.reset();  //重置task,使得task2可以被重复使用
        std::future<int> fut5 = task2.get_future();
        std::thread t5(std::ref(task2), 8); //计算8的阶乘
        cout << fut5.get() << endl;
        t5.join();
    
        //3.std::async与std::future配合使用
        std::future<int> fut6 = std::async(std::launch::async, factorial, 7);
        fut6.wait();
    
        cout <<"aync calc result is: " << fut6.get() << endl;
    
        return 0;
    }
    /*输出结果
    The func output: 50
    thread id: 6532 get result 100
    task_lambda: 512
    .................................
    the factorial result is 5040
    40320
    aync calc result is: 5040
    */

    二. std::promise/std::package_task的应用

    (一)一次性事件及建模

      1. 一个线程在完成其任务之前,可能需要等待特定的一次性事件的发生。在等待期间,线程可以去轮询事件是否发生,也可以去做其他任务。C++标准库使用std::future为这类一次性事件建模。

      2.一旦事件发生,future变为就绪,而std::future的get/wait()只能被调用一次,无法重复使用。如果多线程等待同一个事件,就需要使用std::shared_future,当事件发生时所有相关的shared_future对象均会变为就绪,并且可以访问其关联的任务结果。

      3.期值对象本身并不提供同步访问,当多个线程需要访问一个独立的期值对象时,必须使用互斥量或类似同步机制对访问进行保护。而如果仅为了实现一次性的事件通信,基于条件变量的设计会要求多余的互斥量和标志位,这显然不够干净利落,而使用期值可以很好的处理这个问题。

    (二)线程间传递任务(以GUI消息处理为例)

      1. 在GUI编程中,当一个线程计算完结果,它要发出一条信息给GUI线程,以通知更新界面。

      2. std::package_task提供了实现这种功能的方法,且不需要发送一条自定义信息给GUI线程,而是将函数包装成任务,并传递到GUI线程,使任务在GUI线程中运行。

    【编程实验】std::promise和std::package_task的应用

    #include <iostream>
    #include <future>
    #include <thread>
    #include <mutex>
    #include <queue>
    
    using namespace std;
    
    //1.一次性事件及建模(以实现暂停状态启动的线程为例)
    //创建暂停状态的线程:std::thread类创建的线程,一启动线程就运行起来。但是如果希望在线程运行前设置优先级和内核亲和性,
    //就需要一个可以创建一个暂停的线程,然后通过其提供的native_handle成员,利用平台底层API配置这些线程特征。为达到这一
    //目的,可以利用std::promise / std::future提供的一次性机制来实现暂停状态的线程。
    class MyThread
    {
    private:
        std::thread mThread;
        std::promise<void> mPromise;
        std::future<void> mFuture;
        bool bStart;
    public:
        template<typename Fn, typename ...ArgTypes>
        MyThread(Fn&& fn, ArgTypes&&... args):bStart(false)
        {
            mFuture = mPromise.get_future();
            
            mThread = std::move(std::thread([this, &fn, &args...] {
                mFuture.wait();
                std::forward<Fn>(fn)(std::forward<ArgTypes>(args)...);
                }));
        }
    
        void start()
        {
            if (!bStart) {
                mPromise.set_value();
                bStart = true;
            }
        }
    
        void join()
        {
            mThread.join();
        }
    
        void detach()
        {
            mThread.detach();
        }
     
        bool joinable() {
            return mThread.joinable();
        }
    };
    
    //2. 利用shared_future处理多个反应任务
    //反应任务
    std::mutex g_mtx;
    void reach()
    {
        std::lock_guard<std::mutex> lck(g_mtx);
        cout << "thread(id= " <<std::this_thread::get_id() <<") react"<< endl;
    }
    
    //检测任务(可处理多个反应任务)
    void detech() 
    {
        std::promise<void> pr;
        std::shared_future<void> sf = pr.get_future().share();
        std::vector<std::thread> vec; //反应任务的容器
    
        for (int i = 0; i < 10; ++i)
        {
            vec.emplace_back([sf]   //在sf按值传递,在其副本上wait
            {
                    sf.wait();
                    reach();
            });
        }
    
        //...      //注意,如果此处抛出异常,则detech会失去响应
    
        pr.set_value();  //让所有线程取消暂停
    
        for (auto& t : vec) {
            t.join(); 
        } 
    }
    
    //3. gui消息处理(在线程间传递任务,而不是消息)
    class MessageManager
    {
        std::queue<std::shared_ptr<std::function<void()>>> mQueue;
        std::mutex mtx;
        bool bShutdown = false;
    public:
        void shutDown() { bShutdown = true; }
    
        //将任务包装成package_task
        template<typename Fn, typename... Args>
        std::future<std::result_of_t<Fn && (Args&& ...)>> //postTask函数的返回值类型,future配合package_task使用
        postTask(Fn&& fn, Args&& ...args)
        {
            using Ret = std::result_of_t <Fn && (Args && ...)>; //Fn函数的返回值类型
    
            std::lock_guard<std::mutex> lck(mtx);
    
            ////将任务包装成package_task类型(注意,由于package_task为只移动类型,不能复制。这里在堆上创建)
            auto ptrPA = std::make_shared<std::packaged_task<Ret()>>(std::bind(std::forward<Fn>(fn), std::forward<Args>(args)...));
            auto fut = ptrPA->get_future();
    
            //利用lambda将“std::packaged_task<Ret()>”重新包装成queue队列所需的元素类型:std::function<void()>共享指针类型
            auto pTask = std::make_shared<std::function<void()>>([ptrPA]()->void {(*ptrPA)(); });  
    
            mQueue.push(pTask);
    
            return fut;
        }
    
        void guiThread()
        {
            while (!bShutdown)
            {
                //...     //处理其它gui界面消息
    
                //获取并执行用户任务
                std::shared_ptr<std::function<void()>> task;
                {
                    std::lock_guard<std::mutex> lk(mtx);
                    if (mQueue.empty())
                        continue;
    
                    task = mQueue.front(); //取出队列中的用户任务
                    mQueue.pop();
                }
                (*task)();  //执行任务
            }
        }
    };
    
    int main()
    {
        cout << "main thread running..." << endl;
    
        //1. 以暂停状态启动的线程
        MyThread th([](int x, int y) {
            int res = x + y;
    
            cout << x << " + " << y << " = " << res << endl;
            return res;
        }, 10, 20);
    
        th.start();
    
        th.join();
    
        //2.处理多反应任务(在shared_future上等待)
        detech();
    
        //3. 在线程间传递任务
        MessageManager mm;
        std::thread guiThread(&MessageManager::guiThread,&mm);
    
        auto fut1 = mm.postTask([](int x, int y)->int { return x + y; }, 1, 2);
        auto fut2 = mm.postTask([](int x, int y, int z) {return x * y * z; }, 10, 20, 30);
        auto fut3 = mm.postTask([](const std::string& str) {return str; }, "SantaClaus");
        cout << fut1.get() << endl;  //3
        cout << fut2.get() << endl;  //6000
        cout << fut3.get() << endl;  //SantaClaus
    
        mm.shutDown();
    
        guiThread.join();
    
        return 0;
    }
    /*输出结果
    main thread running...
    10 + 20 = 30
    thread(id= 8792) react
    thread(id= 2600) react
    thread(id= 2604) react
    thread(id= 13384) react
    thread(id= 14864) react
    thread(id= 14884) react
    thread(id= 13588) react
    thread(id= 13516) react
    thread(id= 11956) react
    thread(id= 13540) react
    3
    6000
    SantaClaus
    */
  • 相关阅读:
    Python网络爬虫——bs4基本用法
    Python网络爬虫——requests模块(1)
    yii gii配置ip限制使用gii
    openfire连接数据库mysql
    js 提示条
    jquery滚动条平滑滑动
    yii2.0 添加组件baidu ueditor
    yii添加验证码 和重复密码
    css图标库 font-awesome.min.css
    yii配置访问路由权限配置
  • 原文地址:https://www.cnblogs.com/5iedu/p/11815582.html
Copyright © 2011-2022 走看看