zoukankan      html  css  js  c++  java
  • boost.asio系列——io_service

    IO模型

    io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象)。

        asio::io_service io_service;
        asio::ip::tcp::socket socket(io_service);

    在asio框架中,同步的io主要流程如下:

        

    1. 应用程序调用IO对象成员函数执行IO操作
    2. IO对象向io_service 提出请求.
    3. io_service 调用操作系统的功能执行连接操作.
    4. 操作系统向io_service 返回执行结果.
    5. io_service将错误的操作结果翻译为boost::system::error_code类型,再传递给IO对象.
    6. 如果操作失败,IO对象抛出boost::system::system_error类型的异常.

    而异步IO的处理流程则有些不同:

        

    1. 应用程序调用IO对象成员函数执行IO操作
    2. IO对象请求io_service的服务
    3. io_service 通知操作系统其需要开始一个异步连接.
    4. 操作系统指示连接操作完成, io_service从队列中获取操作结果
    5. 应用程序必须调用io_service::run()以便于接收结果
    6. 调用io_service::run()后,io_service返回一个操作结果,并将其翻译为error_code,传递到事件回调函数中

    io_service对象

    io_servuce的作用: io_servie 实现了一个任务队列,这里的任务就是void(void)的函数。Io_servie最常用的两个接口是post和run,post向任务队列中投递任务,run是执行队列中的任务,直到全部执行完毕,并且run可以被N个线程调用。Io_service是完全线程安全的队列。

    io_service对象提供的接口有run、run_one、poll、poll_one、stop、reset、dispatch、post,最常用的是run、post、stop:

    1. post用于发布io事件,如timer,socket读写等,一般由asio框架相应对象调用,无需我们显式调用。
    2. run用于监听io事件响应,并执行响应回调,对于异步io操作需要在代码中显式调用,对于同步io操作则由io对象隐式调用(并不是run函数,不过也是等待io事件)。

    可见,io_service提供的是一个生产者消费者模型。在异步io操作中需要我们手动控制消费者,调用run函数,它的基本工作模式如下:

    1. 等待io事件响应,如果所有io事件响应完成则退出
    2. 等待到io事件响应后,执行其对应的回调
    3. 继续等待下一个io事件,重复1-2

    Io_servie 实现代码的基本类结构:

    l Io_servie是接口类,为实现跨平台,采用了策略模式,所有接口均有impl_type实现。根据平台不同impl_type分为

    n win_iocp_io_service Win版本的实现,这里主要分析Linux版本。

    n task_io_service 非win平台下的实现,其代码结构为:

    u detail/task_io_service_fwd.hpp 简单声明task_io_service名称

    u detail/task_io_service.hpp 声明task_io_service的方法和属性

    u detail/impl/task_io_service.ipp 具体实现文件

    u 队列中的任务类型为opertioan,原型其实是typedef task_io_service_operation operation,其实现文件在detail/task_io_service_operation.hpp中,当队列中的任务被执行时,就是task_io_service_operation:: complete被调用的时候。

    Io_servie::Post方法的实现

    Post向队列中投递任务,然后激活空闲线程执行任务。其实现流程如下:

    l Post接收handler作为参数,实际上是个仿函数,通过此仿函数构造出completion_handler对象,completion_handler继承自operation。然后调用post_immediate_completion。

    l post_immediate_completion首先将outstanding_work_增加,然后调用post_deferred_completion。

    l post_deferred_completion首先加锁将任务入列,然后调用wake_one_thread_and_unlock

    l wake_one_thread_and_unlock尝试唤醒当前空闲的线程,其实现中特别之处在于,若没有空闲线程,但是有线程在执行task->run,即阻塞在epoll_wait上,那么先中断epoll_wait执行任务队列完成后再执行epoll_wait。

    l first_idle_thread_维护了所有当前空闲线程,实际上使用了Leader/Follower模式,每次唤醒时只唤醒空闲线程的第一个。

    Io_servie::run方法的实现

    Run方法执行队列中的所有任务,直到任务执行完毕。

    l run方法首先构造一个idle_thread_info,和first_idle_thread_类型相同,即通过first_idle_thread_将所有线程串联起来,它这个串联不是立即串联的,当该线程无任务可做是加入到first_idle_thread_的首部,有任务执行时,从first_idle_thread_中断开。这很正常,因为first_idle_thread_维护的是当前空闲线程。

    l 加锁,循环执行do_one方法,直到do_one返回false

    l do_one每次执行一个任务。首先检查队列是否为空,若空将此线程追加到first_idle_thread_的首部,然后阻塞在条件变量上,直到被唤醒。

    l 当被唤醒或是首次执行,若stopped_为true(即此时stop方法被调用了),返回0

    l 队列非空,pop出一个任务,检查队列无任务那么简单的解锁,若仍有,调用wake_one_thread_and_unlock尝试唤醒其他空闲线程执行。然后执行该任务,返回1.

    l 实际上在执行队列任务时有一个特别的判断if (o == &task_operation_),那么将会执行task_->run,task_变量类型为reactor,在linux平台实现为epoll_reactor,实现代码文件为detail/impl/epoll_reactor.ipp,run方法实际上执行的是epoll_wait,run阻塞在epoll_wait上等待事件到来,并且处理完事件后将需要回调的函数push到io_servie的任务队列中,虽然epoll_wait是阻塞的,但是它提供了interrupt函数,该interrupt是如何实现的呢,它向epoll_wait添加一个文件描述符,该文件描述符中有8个字节可读,这个文件描述符是专用于中断epoll_wait的,他被封装到select_interrupter中,select_interrupter实际上实现是eventfd_select_interrupter,在构造的时候通过pipe系统调用创建两个文件描述符,然后预先通过write_fd写8个字节,这8个字节一直保留。在添加到epoll_wait中采用EPOLLET水平触发,这样,只要select_interrupter的读文件描述符添加到epoll_wait中,立即中断epoll_wait。很是巧妙。!!!实际上就是因为有了这个reactor,它才叫io_servie,否则就是一个纯的任务队列了。

    l Run方法的原则是:

    n 有任务立即执行任务,尽量使所有的线程一起执行任务

    n 若没有任务,阻塞在epoll_wait上等待io事件

    n 若有新任务到来,并且没有空闲线程,那么先中断epoll_wait,先执行任务

    n 若队列中有任务,并且也需要epoll_wait监听事件,那么非阻塞调用epoll_wait(timeout字段设置为0),待任务执行完毕在阻塞在epoll_wait上。

    n 几乎对线程的使用上达到了极致。

    n 从这个函数中可以知道,在使用ASIO时,io_servie应该尽量多,这样可以使其epoll_wait占用的时间片最多,这样可以最大限度的响应IO事件,降低响应时延。但是每个io_servie::run占用一个线程,所以io_servie最佳应该和CPU的核数相同。

    Io_servie::stop的实现

    l 加锁,调用stop_all_threads

    l 设置stopped_变量为true,遍历所有的空闲线程,依次唤醒

    l task_interrupted_设置为true,调用task_的interrupt方法

    l task_的类型为reactor,在run方法中已经做了分析

    从中可以看出,io_service是一个工作队列的模型。在使用过程中一般有如下几个需要注意的地方:

    1. run函数在io事件完成后会退出,导致后续基于该对象的异步io任务无法执行

    由于io_service并不会主动常见调度线程,需要我们手动分配,常见的方式是给其分配一个线程,然后执行run函数。但run函数在io事件完成后会退出,线程会终止,后续基于该对象的异步io任务无法得到调度。

    解决这个问题的方法是通过一个asio::io_service::work对象来守护io_service。这样,即使所有io任务都执行完成,也不会退出,继续等待新的io任务。

        boost::asio::io_service io;
        boost::asio::io_service::work work(io);
        io.run();

    2. 回调在run函数的线程中同步执行,当回调处理时间较长时阻塞后续io响应

    解决这个问题的方法有两种:1. 启动多线程执行run函数(run函数是线程安全的),2. 新启动一个线程(或通过线程池)来执行回调函数。一般来讲,如果回调处理事件不是特别短,应该使用在线程池中处理回调的方式。

    3. 回调在run函数的线程中同步执行,io事件较多的时候得不到及时响应

    这个其实是性能问题了,在多核cpu上可以通过在多个线程中执行run函数来解决这一问题。这种方式也只能充分利用cpu性能,本身性能问题就不是光靠软件就能解决的。

    .net中的异步io调度方式

    和io_service这种手动控制的方式比起来,.net则是纯粹的自动档了。IO调度由CLR托管了,无需手动控制。回调也是在线程池中执行,无需担心影响后续IO响应。

    正是由于CLR的托管,在.net 的异步IO框架中,就没有类似io_service的调度对象存在,这也符合.net的一贯简洁做法。

    ◆boost::asio::io_service使用时的注意事项:

    ①请让boost::asio::io_service和boost::asio::io_service::work搭配使用。

    ②想让event按照进入(strand)时的顺序被执行,需要boost::asio::io_service要和boost::asio::io_service::strand搭配使用

    ③一般情况下,io_service的成员函数的使用顺序:

    boost::asio::io_service构造,
    boost::asio::io_service::run(),
    boost::asio::io_service::stop(),
    boost::asio::io_service::reset(),
    boost::asio::io_service::run(),
    ......
    boost::asio::io_service析构,

    ④不论有没有使用io_service::work,run()都会执行完io_service里面的event,(若没有用work,run就会退出)。
    ⑤一个新创建的io_service不需要执行reset()函数。
    ⑥在调用stop()后,在调用run()之前,请先调用reset()函数。
    ⑦函数stop()和reset()并不能清除掉io_service里面尚未执行的event。
    我个人认为,也只有析构掉io_service,才能清空它里面的那些尚未执行的event了。(可以用智能指针)。

    ⑧函数stop(),stopped(),reset(),很简单,请单步调试,以明白它在函数里做了什么。

    ⑨boost的.hpp文件里面(一般情况下)有各个函数的使用说明,你可以随时查看。

    ◆下面是boost::asio::io_service的stop()和reset()函数的注释的翻译:

    void boost::asio::io_service::stop();
    BOOST_ASIO_DECL void stop();
    /// Stop the io_service object's event processing loop.
    /// 停止io_service对象的事件处理循环。
    /**
     * This function does not block, but instead simply signals the io_service to
     * stop. All invocations of its run() or run_one() member functions should
     * return as soon as possible. Subsequent calls to run(), run_one(), poll()
     * or poll_one() will return immediately until reset() is called.
     */
     /**
     这个函数不阻塞,而是仅仅表示io_service停止了。
     它的run()或run_one()成员函数的调用应当尽快返回。
     对run()、run_one()、poll()、poll_one()的随后的调用将会立即返回直到reset()函数被调用了。
     */
    void boost::asio::io_service::reset();
    BOOST_ASIO_DECL void reset();
    /// Reset the io_service in preparation for a subsequent run() invocation.
    /// 重置io_service对象,为随后的run()调用做准备。
    /**
     * This function must be called prior to any second or later set of
     * invocations of the run(), run_one(), poll() or poll_one() functions when a
     * previous invocation of these functions returned due to the io_service
     * being stopped or running out of work. After a call to reset(), the
     * io_service object's stopped() function will return @c false.
     *
     * This function must not be called while there are any unfinished calls to
     * the run(), run_one(), poll() or poll_one() functions.
     */
     /**
     io_service被停止,或者执行完handler而缺乏工作时,run()、run_one()、poll()、poll_one()函数的调用会被返回。
     这些函数在被调用之前,必须先调用reset函数。
     在reset函数被调用后,io_service对象的stopped函数将会返回false。
     当run()、run_one()、poll()、poll_one()函数的任何的调用未结束时,这个函数一定不能被调用。
     */

    ◆对stop()和reset()函数的一点说明(是我单步调试时看到的):

    在Windows下,boost::asio::io_service类里面有一个数据成员为"stopped_"(Flag to indicate whether the event loop has been stopped.)。它是一个标志,它标志着事件循环是不是被stopped了。而boost::asio::io_service::reset()函数仅仅是赋值"stopped_=0"。boost::asio::io_service::stopped()函数仅仅是判断"0!=stopped_"的真假。你单步调试一下,就什么都知道了。

    ◆下面是我验证boost::asio::io_service的一个例子:

     1 #include <boost/asio.hpp>  
     2 #include <boost/thread.hpp>  
     3 #include <boost/atomic.hpp>  
     4 #include <boost/shared_ptr.hpp>  
     5 #include <boost/date_time/posix_time/ptime.hpp>  
     6 #include <boost/date_time.hpp>//boost::posix_time::to_iso_extended_string()需要此头文件。  
     7   
     8 //boost::atomic_bool coutFlag = false;  
     9 //error C2440: 'initializing' : cannot convert from 'bool' to 'boost::atomics::atomic<bool>'  
    10 //故意写错,可以根据错误信息知道某类型的详细信息。  
    11 boost::atomic_bool g_coutFlag(false);  
    12 boost::atomic_int g_numIn(0);  
    13 boost::atomic_int g_numOut(0);  
    14   
    15 boost::thread_group g_thgp;  
    16 boost::asio::io_service g_io;  
    17 boost::shared_ptr<boost::asio::io_service::work> g_pWork =   
    18 boost::shared_ptr<boost::asio::io_service::work>(new boost::asio::io_service::work(g_io));  
    19 boost::asio::io_service::strand g_strand(g_io);  
    20 std::vector<boost::posix_time::ptime> g_vecTimes;  
    21   
    22 void my_run_4_io_service(boost::asio::io_service& _io, int _idx)  
    23 {  
    24     _io.run();  
    25     //想得到boost::asio::io_service::run()退出时的时刻,只能对io_service进行封装了。  
    26     g_vecTimes[_idx] = boost::posix_time::microsec_clock::local_time();  
    27 }  
    28   
    29 void outFun(int idx)  
    30 {// io_service执行的handler。  
    31     ++g_numOut;  
    32     if (g_coutFlag.load())  
    33         std::cout << "outFun: index=" << idx << std::endl;  
    34     boost::this_thread::sleep_for(boost::chrono::milliseconds(500));  
    35 }  
    36   
    37 void inFun()  
    38 {  
    39     for (int i = 1; i <= 10; ++i)  
    40     {  
    41         g_strand.post(boost::bind(outFun, i));  
    42         ++g_numIn;  
    43         boost::this_thread::sleep_for(boost::chrono::milliseconds(100));  
    44     }  
    45     g_coutFlag = true;  
    46     g_io.stop();//调用它后,不论io_service有没有使用io_service::work类,各个线程的run()都会立即返回。  
    47     g_vecTimes[0] = boost::posix_time::microsec_clock::local_time();  
    48     int numDelta = g_numIn - g_numOut;  
    49     std::cout << "inFun: numDelta=" << numDelta << std::endl;//还剩多少event没有被执行。  
    50 }  
    51   
    52 int main()  
    53 {  
    54     int vecNum = 5;  
    55     g_vecTimes.reserve(vecNum); g_vecTimes.resize(vecNum);  
    56     //一个容纳 void fun(int i) 函数的 function对象。  
    57     boost::function<void(int)> my_lambda_function_object = [vecNum](int secs)  
    58     {  
    59         boost::this_thread::sleep_for(boost::chrono::microseconds(1000 * 1000 * secs));  
    60         std::cout << "now, time is " << boost::posix_time::  
    61             to_iso_extended_string(boost::posix_time::microsec_clock::local_time()) << std::endl;  
    62         for (int i = 0; i < vecNum; ++i)  
    63             std::cout << i << " : " << boost::posix_time::to_iso_extended_string(g_vecTimes[i]) << std::endl;  
    64     };  
    65   
    66     for (int i = 1; i < vecNum; ++i)  
    67         g_thgp.create_thread(boost::bind(my_run_4_io_service, boost::ref(g_io), i));  
    68     g_thgp.create_thread(inFun);  
    69     //等待5秒,确保执行完毕我设计的那些操作。  
    70     my_lambda_function_object(5);  
    71     //析构掉io_service对应的io_service::work对象,此时io_service里面还有event。  
    72     g_pWork = nullptr;  
    73     boost::this_thread::sleep_for(boost::chrono::milliseconds(1000 * 1));  
    74     g_io.reset();  
    75     boost::this_thread::sleep_for(boost::chrono::seconds(1));  
    76     //因为work被析构掉了,所以启动的那些线程在执行完event后,都自行退出了。  
    77     for (int i = 1; i < vecNum; ++i)  
    78         g_thgp.create_thread(boost::bind(my_run_4_io_service, boost::ref(g_io), i));  
    79     //等待6秒,确保io_service中剩余的event被执行完毕。  
    80     my_lambda_function_object(6);  
    81     std::cout << "done." << std::endl;  
    82     int cmd_val = getchar();  
    83     return 0;  
    84 }  
     
  • 相关阅读:
    LeetCode.5-最长回文子串(Longest Palindromic Substring)
    LeetCode.3-最长无重复字符子串(Longest Substring Without Repeating Characters)
    2013 最新的 play web framework 版本 1.2.3 框架学习文档整理
    play framework学习笔记之 模板引擎
    C# 枚举、字符串、值的相互转换
    styleCop
    配置VS代码生成工具ReSharper快捷键
    StackOverflow Update: 560M Pageviews A Month, 25 Servers, And It's All About Performance
    开启Windows Server 2008 R2上帝模式
    微信支付实战(与支付宝和其他一些支付有些不一样)
  • 原文地址:https://www.cnblogs.com/fnlingnzb-learner/p/10402232.html
Copyright © 2011-2022 走看看