zoukankan      html  css  js  c++  java
  • boost.asio系列——io_service

    IO模型

    io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象)。

        asio::io_service io_service;
        asio::ip::tcp::socket socket(io_service);

    在asio框架中,同步的io主要流程如下:

        

    1. 应用程序调用IO对象成员函数执行IO操作
    2. IO对象向io_service 提出请求.
    3. io_service 调用操作系统的功能执行连接操作.
    4. 操作系统向io_service 返回执行结果.
    5. io_service将错误的操作结果翻译为boost::system::error_code类型,再传递给IO对象.
    6. 如果操作失败,IO对象抛出boost::system::system_error类型的异常.

    而异步IO的处理流程则有些不同:

        

    1. 应用程序调用IO对象成员函数执行IO操作
    2. IO对象请求io_service的服务
    3. io_service 通知操作系统其需要开始一个异步连接.
    4. 操作系统指示连接操作完成, io_service从队列中获取操作结果
    5. 应用程序必须调用io_service::run()以便于接收结果
    6. 调用io_service::run()后,io_service返回一个操作结果,并将其翻译为error_code,传递到事件回调函数中

    io_service对象

    io_servuce的作用: io_servie 实现了一个任务队列,这里的任务就是void(void)的函数。Io_servie最常用的两个接口是post和run,post向任务队列中投递任务,run是执行队列中的任务,直到全部执行完毕,并且run可以被N个线程调用。Io_service是完全线程安全的队列。

    io_service对象提供的接口有run、run_one、poll、poll_one、stop、reset、dispatch、post,最常用的是run、post、stop:

    1. post用于发布io事件,如timer,socket读写等,一般由asio框架相应对象调用,无需我们显式调用。
    2. run用于监听io事件响应,并执行响应回调,对于异步io操作需要在代码中显式调用,对于同步io操作则由io对象隐式调用(并不是run函数,不过也是等待io事件)。

    可见,io_service提供的是一个生产者消费者模型。在异步io操作中需要我们手动控制消费者,调用run函数,它的基本工作模式如下:

    1. 等待io事件响应,如果所有io事件响应完成则退出
    2. 等待到io事件响应后,执行其对应的回调
    3. 继续等待下一个io事件,重复1-2

    Io_servie 实现代码的基本类结构:

    l Io_servie是接口类,为实现跨平台,采用了策略模式,所有接口均有impl_type实现。根据平台不同impl_type分为

    n win_iocp_io_service Win版本的实现,这里主要分析Linux版本。

    n task_io_service 非win平台下的实现,其代码结构为:

    u detail/task_io_service_fwd.hpp 简单声明task_io_service名称

    u detail/task_io_service.hpp 声明task_io_service的方法和属性

    u detail/impl/task_io_service.ipp 具体实现文件

    u 队列中的任务类型为opertioan,原型其实是typedef task_io_service_operation operation,其实现文件在detail/task_io_service_operation.hpp中,当队列中的任务被执行时,就是task_io_service_operation:: complete被调用的时候。

    Io_servie::Post方法的实现

    Post向队列中投递任务,然后激活空闲线程执行任务。其实现流程如下:

    l Post接收handler作为参数,实际上是个仿函数,通过此仿函数构造出completion_handler对象,completion_handler继承自operation。然后调用post_immediate_completion。

    l post_immediate_completion首先将outstanding_work_增加,然后调用post_deferred_completion。

    l post_deferred_completion首先加锁将任务入列,然后调用wake_one_thread_and_unlock

    l wake_one_thread_and_unlock尝试唤醒当前空闲的线程,其实现中特别之处在于,若没有空闲线程,但是有线程在执行task->run,即阻塞在epoll_wait上,那么先中断epoll_wait执行任务队列完成后再执行epoll_wait。

    l first_idle_thread_维护了所有当前空闲线程,实际上使用了Leader/Follower模式,每次唤醒时只唤醒空闲线程的第一个。

    Io_servie::run方法的实现

    Run方法执行队列中的所有任务,直到任务执行完毕。

    l run方法首先构造一个idle_thread_info,和first_idle_thread_类型相同,即通过first_idle_thread_将所有线程串联起来,它这个串联不是立即串联的,当该线程无任务可做是加入到first_idle_thread_的首部,有任务执行时,从first_idle_thread_中断开。这很正常,因为first_idle_thread_维护的是当前空闲线程。

    l 加锁,循环执行do_one方法,直到do_one返回false

    l do_one每次执行一个任务。首先检查队列是否为空,若空将此线程追加到first_idle_thread_的首部,然后阻塞在条件变量上,直到被唤醒。

    l 当被唤醒或是首次执行,若stopped_为true(即此时stop方法被调用了),返回0

    l 队列非空,pop出一个任务,检查队列无任务那么简单的解锁,若仍有,调用wake_one_thread_and_unlock尝试唤醒其他空闲线程执行。然后执行该任务,返回1.

    l 实际上在执行队列任务时有一个特别的判断if (o == &task_operation_),那么将会执行task_->run,task_变量类型为reactor,在linux平台实现为epoll_reactor,实现代码文件为detail/impl/epoll_reactor.ipp,run方法实际上执行的是epoll_wait,run阻塞在epoll_wait上等待事件到来,并且处理完事件后将需要回调的函数push到io_servie的任务队列中,虽然epoll_wait是阻塞的,但是它提供了interrupt函数,该interrupt是如何实现的呢,它向epoll_wait添加一个文件描述符,该文件描述符中有8个字节可读,这个文件描述符是专用于中断epoll_wait的,他被封装到select_interrupter中,select_interrupter实际上实现是eventfd_select_interrupter,在构造的时候通过pipe系统调用创建两个文件描述符,然后预先通过write_fd写8个字节,这8个字节一直保留。在添加到epoll_wait中采用EPOLLET水平触发,这样,只要select_interrupter的读文件描述符添加到epoll_wait中,立即中断epoll_wait。很是巧妙。!!!实际上就是因为有了这个reactor,它才叫io_servie,否则就是一个纯的任务队列了。

    l Run方法的原则是:

    n 有任务立即执行任务,尽量使所有的线程一起执行任务

    n 若没有任务,阻塞在epoll_wait上等待io事件

    n 若有新任务到来,并且没有空闲线程,那么先中断epoll_wait,先执行任务

    n 若队列中有任务,并且也需要epoll_wait监听事件,那么非阻塞调用epoll_wait(timeout字段设置为0),待任务执行完毕在阻塞在epoll_wait上。

    n 几乎对线程的使用上达到了极致。

    n 从这个函数中可以知道,在使用ASIO时,io_servie应该尽量多,这样可以使其epoll_wait占用的时间片最多,这样可以最大限度的响应IO事件,降低响应时延。但是每个io_servie::run占用一个线程,所以io_servie最佳应该和CPU的核数相同。

    Io_servie::stop的实现

    l 加锁,调用stop_all_threads

    l 设置stopped_变量为true,遍历所有的空闲线程,依次唤醒

    l task_interrupted_设置为true,调用task_的interrupt方法

    l task_的类型为reactor,在run方法中已经做了分析

    从中可以看出,io_service是一个工作队列的模型。在使用过程中一般有如下几个需要注意的地方:

    1. run函数在io事件完成后会退出,导致后续基于该对象的异步io任务无法执行

    由于io_service并不会主动常见调度线程,需要我们手动分配,常见的方式是给其分配一个线程,然后执行run函数。但run函数在io事件完成后会退出,线程会终止,后续基于该对象的异步io任务无法得到调度。

    解决这个问题的方法是通过一个asio::io_service::work对象来守护io_service。这样,即使所有io任务都执行完成,也不会退出,继续等待新的io任务。

        boost::asio::io_service io;
        boost::asio::io_service::work work(io);
        io.run();

    2. 回调在run函数的线程中同步执行,当回调处理时间较长时阻塞后续io响应

    解决这个问题的方法有两种:1. 启动多线程执行run函数(run函数是线程安全的),2. 新启动一个线程(或通过线程池)来执行回调函数。一般来讲,如果回调处理事件不是特别短,应该使用在线程池中处理回调的方式。

    3. 回调在run函数的线程中同步执行,io事件较多的时候得不到及时响应

    这个其实是性能问题了,在多核cpu上可以通过在多个线程中执行run函数来解决这一问题。这种方式也只能充分利用cpu性能,本身性能问题就不是光靠软件就能解决的。

    .net中的异步io调度方式

    和io_service这种手动控制的方式比起来,.net则是纯粹的自动档了。IO调度由CLR托管了,无需手动控制。回调也是在线程池中执行,无需担心影响后续IO响应。

    正是由于CLR的托管,在.net 的异步IO框架中,就没有类似io_service的调度对象存在,这也符合.net的一贯简洁做法。

    ◆boost::asio::io_service使用时的注意事项:

    ①请让boost::asio::io_service和boost::asio::io_service::work搭配使用。

    ②想让event按照进入(strand)时的顺序被执行,需要boost::asio::io_service要和boost::asio::io_service::strand搭配使用

    ③一般情况下,io_service的成员函数的使用顺序:

    boost::asio::io_service构造,
    boost::asio::io_service::run(),
    boost::asio::io_service::stop(),
    boost::asio::io_service::reset(),
    boost::asio::io_service::run(),
    ......
    boost::asio::io_service析构,

    ④不论有没有使用io_service::work,run()都会执行完io_service里面的event,(若没有用work,run就会退出)。
    ⑤一个新创建的io_service不需要执行reset()函数。
    ⑥在调用stop()后,在调用run()之前,请先调用reset()函数。
    ⑦函数stop()和reset()并不能清除掉io_service里面尚未执行的event。
    我个人认为,也只有析构掉io_service,才能清空它里面的那些尚未执行的event了。(可以用智能指针)。

    ⑧函数stop(),stopped(),reset(),很简单,请单步调试,以明白它在函数里做了什么。

    ⑨boost的.hpp文件里面(一般情况下)有各个函数的使用说明,你可以随时查看。

    ◆下面是boost::asio::io_service的stop()和reset()函数的注释的翻译:

    void boost::asio::io_service::stop();
    BOOST_ASIO_DECL void stop();
    /// Stop the io_service object's event processing loop.
    /// 停止io_service对象的事件处理循环。
    /**
     * This function does not block, but instead simply signals the io_service to
     * stop. All invocations of its run() or run_one() member functions should
     * return as soon as possible. Subsequent calls to run(), run_one(), poll()
     * or poll_one() will return immediately until reset() is called.
     */
     /**
     这个函数不阻塞,而是仅仅表示io_service停止了。
     它的run()或run_one()成员函数的调用应当尽快返回。
     对run()、run_one()、poll()、poll_one()的随后的调用将会立即返回直到reset()函数被调用了。
     */
    void boost::asio::io_service::reset();
    BOOST_ASIO_DECL void reset();
    /// Reset the io_service in preparation for a subsequent run() invocation.
    /// 重置io_service对象,为随后的run()调用做准备。
    /**
     * This function must be called prior to any second or later set of
     * invocations of the run(), run_one(), poll() or poll_one() functions when a
     * previous invocation of these functions returned due to the io_service
     * being stopped or running out of work. After a call to reset(), the
     * io_service object's stopped() function will return @c false.
     *
     * This function must not be called while there are any unfinished calls to
     * the run(), run_one(), poll() or poll_one() functions.
     */
     /**
     io_service被停止,或者执行完handler而缺乏工作时,run()、run_one()、poll()、poll_one()函数的调用会被返回。
     这些函数在被调用之前,必须先调用reset函数。
     在reset函数被调用后,io_service对象的stopped函数将会返回false。
     当run()、run_one()、poll()、poll_one()函数的任何的调用未结束时,这个函数一定不能被调用。
     */

    ◆对stop()和reset()函数的一点说明(是我单步调试时看到的):

    在Windows下,boost::asio::io_service类里面有一个数据成员为"stopped_"(Flag to indicate whether the event loop has been stopped.)。它是一个标志,它标志着事件循环是不是被stopped了。而boost::asio::io_service::reset()函数仅仅是赋值"stopped_=0"。boost::asio::io_service::stopped()函数仅仅是判断"0!=stopped_"的真假。你单步调试一下,就什么都知道了。

    ◆下面是我验证boost::asio::io_service的一个例子:

     1 #include <boost/asio.hpp>  
     2 #include <boost/thread.hpp>  
     3 #include <boost/atomic.hpp>  
     4 #include <boost/shared_ptr.hpp>  
     5 #include <boost/date_time/posix_time/ptime.hpp>  
     6 #include <boost/date_time.hpp>//boost::posix_time::to_iso_extended_string()需要此头文件。  
     7   
     8 //boost::atomic_bool coutFlag = false;  
     9 //error C2440: 'initializing' : cannot convert from 'bool' to 'boost::atomics::atomic<bool>'  
    10 //故意写错,可以根据错误信息知道某类型的详细信息。  
    11 boost::atomic_bool g_coutFlag(false);  
    12 boost::atomic_int g_numIn(0);  
    13 boost::atomic_int g_numOut(0);  
    14   
    15 boost::thread_group g_thgp;  
    16 boost::asio::io_service g_io;  
    17 boost::shared_ptr<boost::asio::io_service::work> g_pWork =   
    18 boost::shared_ptr<boost::asio::io_service::work>(new boost::asio::io_service::work(g_io));  
    19 boost::asio::io_service::strand g_strand(g_io);  
    20 std::vector<boost::posix_time::ptime> g_vecTimes;  
    21   
    22 void my_run_4_io_service(boost::asio::io_service& _io, int _idx)  
    23 {  
    24     _io.run();  
    25     //想得到boost::asio::io_service::run()退出时的时刻,只能对io_service进行封装了。  
    26     g_vecTimes[_idx] = boost::posix_time::microsec_clock::local_time();  
    27 }  
    28   
    29 void outFun(int idx)  
    30 {// io_service执行的handler。  
    31     ++g_numOut;  
    32     if (g_coutFlag.load())  
    33         std::cout << "outFun: index=" << idx << std::endl;  
    34     boost::this_thread::sleep_for(boost::chrono::milliseconds(500));  
    35 }  
    36   
    37 void inFun()  
    38 {  
    39     for (int i = 1; i <= 10; ++i)  
    40     {  
    41         g_strand.post(boost::bind(outFun, i));  
    42         ++g_numIn;  
    43         boost::this_thread::sleep_for(boost::chrono::milliseconds(100));  
    44     }  
    45     g_coutFlag = true;  
    46     g_io.stop();//调用它后,不论io_service有没有使用io_service::work类,各个线程的run()都会立即返回。  
    47     g_vecTimes[0] = boost::posix_time::microsec_clock::local_time();  
    48     int numDelta = g_numIn - g_numOut;  
    49     std::cout << "inFun: numDelta=" << numDelta << std::endl;//还剩多少event没有被执行。  
    50 }  
    51   
    52 int main()  
    53 {  
    54     int vecNum = 5;  
    55     g_vecTimes.reserve(vecNum); g_vecTimes.resize(vecNum);  
    56     //一个容纳 void fun(int i) 函数的 function对象。  
    57     boost::function<void(int)> my_lambda_function_object = [vecNum](int secs)  
    58     {  
    59         boost::this_thread::sleep_for(boost::chrono::microseconds(1000 * 1000 * secs));  
    60         std::cout << "now, time is " << boost::posix_time::  
    61             to_iso_extended_string(boost::posix_time::microsec_clock::local_time()) << std::endl;  
    62         for (int i = 0; i < vecNum; ++i)  
    63             std::cout << i << " : " << boost::posix_time::to_iso_extended_string(g_vecTimes[i]) << std::endl;  
    64     };  
    65   
    66     for (int i = 1; i < vecNum; ++i)  
    67         g_thgp.create_thread(boost::bind(my_run_4_io_service, boost::ref(g_io), i));  
    68     g_thgp.create_thread(inFun);  
    69     //等待5秒,确保执行完毕我设计的那些操作。  
    70     my_lambda_function_object(5);  
    71     //析构掉io_service对应的io_service::work对象,此时io_service里面还有event。  
    72     g_pWork = nullptr;  
    73     boost::this_thread::sleep_for(boost::chrono::milliseconds(1000 * 1));  
    74     g_io.reset();  
    75     boost::this_thread::sleep_for(boost::chrono::seconds(1));  
    76     //因为work被析构掉了,所以启动的那些线程在执行完event后,都自行退出了。  
    77     for (int i = 1; i < vecNum; ++i)  
    78         g_thgp.create_thread(boost::bind(my_run_4_io_service, boost::ref(g_io), i));  
    79     //等待6秒,确保io_service中剩余的event被执行完毕。  
    80     my_lambda_function_object(6);  
    81     std::cout << "done." << std::endl;  
    82     int cmd_val = getchar();  
    83     return 0;  
    84 }  
     
  • 相关阅读:
    无限维
    黎曼流形
    why we need virtual key word
    TOJ 4119 Split Equally
    TOJ 4003 Next Permutation
    TOJ 4002 Palindrome Generator
    TOJ 2749 Absent Substrings
    TOJ 2641 Gene
    TOJ 2861 Octal Fractions
    TOJ 4394 Rebuild Road
  • 原文地址:https://www.cnblogs.com/fnlingnzb-learner/p/10402232.html
Copyright © 2011-2022 走看看