zoukankan      html  css  js  c++  java
  • asio的异步与线程模型解析

    前言

    本文使用的asio1.76.0从github获取,老版本中有一个很重要的结构叫做io_service,新版本中改成了io_context,下面主要通过c++14下的一个异步的例子分析boost.asio的线程模型,其代码结构比较复杂,时间有限不能分析的很详细,只是做大体结构分析

    正文

    asio的线程模型和异步的调用如下图

    程序以一个io_context为核心,其下有一个scheduler对象(调度器),scheduler下面放着一个(op_queue_)任务队列,一个epoll_fd,执行io_context.run()的时候,开始一个循环去竞争消费任务队列;任务队列有两种任务,一种是epoll任务,就是去做一次epoll_wait,还有一种任务是执行回调用函数;当使用一个异步调用的时候,本质就是往reactor中添加一个fd的event,将其回调保存进去 event._data中,当执行epoll_wait()监听到该event事件后,取出该fd对应的callback放入到任务队列(op_queue),由不同的线程去竞争消费;

    拿官方的例子简单分析这个流程,该例子来源于 asio/src/examples/cpp14/echo/async_tcp_echo_server.cpp

    #include <cstdlib>
    #include <iostream>
    #include <memory>
    #include <utility>
    #include <asio/ts/buffer.hpp>
    #include <asio/ts/internet.hpp>
    
    using asio::ip::tcp;
    
    class session
      : public std::enable_shared_from_this<session>
    {
    public:
      session(tcp::socket socket)
        : socket_(std::move(socket))
      {
      }
    
      void start()
      {
        do_read();
      }
    
    private:
      void do_read()
      {
        auto self(shared_from_this());
        socket_.async_read_some(asio::buffer(data_, max_length),
            [this, self](std::error_code ec, std::size_t length)
            {
              if (!ec)
              {
                do_write(length);
              }
            });
      }
    
      void do_write(std::size_t length)
      {
        auto self(shared_from_this());
        asio::async_write(socket_, asio::buffer(data_, length),
            [this, self](std::error_code ec, std::size_t /*length*/)
            {
              if (!ec)
              {
                do_read();
              }
            });
      }
    
      tcp::socket socket_;
      enum { max_length = 1024 };
      char data_[max_length];
    };
    
    class server
    {
    public:
      server(asio::io_context& io_context, short port)
        : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
          socket_(io_context)
      {
        do_accept();
      }
    
    private:
      void do_accept()
      {
        acceptor_.async_accept(socket_,
            [this](std::error_code ec)
            {
              if (!ec)
              {
                std::make_shared<session>(std::move(socket_))->start();
              }
    
              do_accept();
            });
      }
    
      tcp::acceptor acceptor_;
      tcp::socket socket_;
    };
    
    int main(int argc, char* argv[])
    {
      try
      {
        if (argc != 2)
        {
          std::cerr << "Usage: async_tcp_echo_server <port>
    ";
          return 1;
        }
    
        asio::io_context io_context;
    
        server s(io_context, std::atoi(argv[1]));
    
        io_context.run();
      }
      catch (std::exception& e)
      {
        std::cerr << "Exception: " << e.what() << "
    ";
      }
    
      return 0;
    }
    

    代码很长,这里重点分析io_context.run();acceptor_.async_accept以分析其线程模型和异步,main()中创建了io_context 和 server, server中创建了acceptor和socket,然后执行一个异步async_accept,将一个lambda函数注册进去,accept事件发生了,进行accept创建客户端的socket后,调用该lambda回调去处理新accept的socket; 然后执行io_content.run(),做一个循环监听

    程序的调用图如下所示(原图)

    如图上标黄,主要分成3个比较重要的部分:

    第一部分是acceptor的创建,放入epoll进行监听, 在server的构造函数中开始构造acceptor

    class server
    {
    public:
      server(asio::io_context& io_context, short port)
        : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)), //构造acceptor
          socket_(io_context)
      {
        do_accept();
      }
    
    private:
    	......
    
      tcp::acceptor acceptor_;
      tcp::socket socket_;
    };
    

    但其实是用了一个typedef,构造的是class basic_socket_acceptor<tcp>,在这里,除了看到了熟悉的bind和listen,重要的是open

      // /usr/local/include/asio/basic_socket_acceptor.hpp
      
      template <typename ExecutionContext>
      basic_socket_acceptor(ExecutionContext& context,
          const endpoint_type& endpoint, bool reuse_addr = true,
          typename constraint<
            is_convertible<ExecutionContext&, execution_context&>::value
          >::type = 0)
        : impl_(0, 0, context)
      {
        asio::error_code ec;
        const protocol_type protocol = endpoint.protocol();
        impl_.get_service().open(impl_.get_implementation(), protocol, ec); //open
        asio::detail::throw_error(ec, "open");
        if (reuse_addr)
        {
          impl_.get_service().set_option(impl_.get_implementation(),
              socket_base::reuse_address(true), ec);
          asio::detail::throw_error(ec, "set_option");
        }
        impl_.get_service().bind(impl_.get_implementation(), endpoint, ec);// bind
        asio::detail::throw_error(ec, "bind");
        impl_.get_service().listen(impl_.get_implementation(),			   // listen
            socket_base::max_listen_connections, ec);
        asio::detail::throw_error(ec, "listen");
      }
    

    open()do_open()的封装,为了实现跨平台, 作用是创建一个socket,并为其添加相应epoll event

      // Open a new socket implementation.
      asio::error_code open(implementation_type& impl,
          const protocol_type& protocol, asio::error_code& ec)
      {
        if (!do_open(impl, protocol.family(),
              protocol.type(), protocol.protocol(), ec))
          impl.protocol_ = protocol;
        return ec;
      }
    

    以下是do_open( ),socket_holder创建了server端的socket, register_descriptor( )将该socket注册到了该io_context上的epoll fd上

    asio::error_code reactive_socket_service_base::do_open(
        reactive_socket_service_base::base_implementation_type& impl,
        int af, int type, int protocol, asio::error_code& ec)
    {
      if (is_open(impl))
      {
        ec = asio::error::already_open;
        return ec;
      }
    
      socket_holder sock(socket_ops::socket(af, type, protocol, ec)); //创建了socket
      if (sock.get() == invalid_socket)
        return ec;
    
      // 注册到epoll上
      if (int err = reactor_.register_descriptor(sock.get(), impl.reactor_data_))
      {
        ec = asio::error_code(err,
            asio::error::get_system_category());
        return ec;
      }
    
      impl.socket_ = sock.release();
      switch (type)
      {
      case SOCK_STREAM: impl.state_ = socket_ops::stream_oriented; break;
      case SOCK_DGRAM: impl.state_ = socket_ops::datagram_oriented; break;
      default: impl.state_ = 0; break;
      }
      ec = asio::error_code();
      return ec;
    }
    

    到此,一个完整的server的socket就创建完毕,并且进行bind和listen,并放入到epoll中


    第二个重要的部分是async_accept()这个异步的运行

    class server
    {
    	......
    private:
      void do_accept()
      {
        acceptor_.async_accept(socket_,  //执行异步调用
            [this](std::error_code ec)
            {
              if (!ec)
              {
                std::make_shared<session>(std::move(socket_))->start();
              }
    
              do_accept();
            });
      }
    
      tcp::acceptor acceptor_;
      tcp::socket socket_;
    };
    

    使用第一部分创建好的acceptor进行async_accept()后,来到一个看起来很复杂的模板函数

      template <typename Protocol1, typename Executor1,
          ASIO_COMPLETION_TOKEN_FOR(void (asio::error_code))
            AcceptHandler ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>
      ASIO_INITFN_AUTO_RESULT_TYPE(AcceptHandler,
          void (asio::error_code))
      async_accept(basic_socket<Protocol1, Executor1>& peer,
          ASIO_MOVE_ARG(AcceptHandler) handler
            ASIO_DEFAULT_COMPLETION_TOKEN(executor_type),
          typename constraint<
            is_convertible<Protocol, Protocol1>::value
          >::type = 0)
      {
        return async_initiate<AcceptHandler, void (asio::error_code)>(
            initiate_async_accept(this), handler,
            &peer, static_cast<endpoint_type*>(0));
      }
    

    不用太在意头,看看async_initiate()做了啥

    template <typename CompletionToken,
        ASIO_COMPLETION_SIGNATURE Signature,
        typename Initiation, typename... Args>
    inline typename constraint<
        !detail::async_result_has_initiate_memfn<CompletionToken, Signature>::value,
        ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)>::type
    async_initiate(ASIO_MOVE_ARG(Initiation) initiation,
        ASIO_NONDEDUCED_MOVE_ARG(CompletionToken) token,
        ASIO_MOVE_ARG(Args)... args)
    {
      async_completion<CompletionToken, Signature> completion(token);
    
      ASIO_MOVE_CAST(Initiation)(initiation)(
          ASIO_MOVE_CAST(ASIO_HANDLER_TYPE(CompletionToken,
            Signature))(completion.completion_handler),
          ASIO_MOVE_CAST(Args)(args)...);
    
      return completion.result.get();
    }
    

    来到了一个更加复杂的模板函数,但是函数体只有两三句话,发现其调用的时候模板类Initiation的operator( ),这个模板类是class initiate_async_accept, 其对应的operator( )如下

        template <typename AcceptHandler, typename Protocol1, typename Executor1>
        void operator()(ASIO_MOVE_ARG(AcceptHandler) handler,
            basic_socket<Protocol1, Executor1>* peer,
            endpoint_type* peer_endpoint) const
        {
          // If you get an error on the following line it means that your handler
          // does not meet the documented type requirements for a AcceptHandler.
          ASIO_ACCEPT_HANDLER_CHECK(AcceptHandler, handler) type_check;
    
          detail::non_const_lvalue<AcceptHandler> handler2(handler);
          self_->impl_.get_service().async_accept(
              self_->impl_.get_implementation(), *peer, peer_endpoint,
              handler2.value, self_->impl_.get_executor());
        }
    

    核心就一句self_->impl_.get_service().async_accept(...), 这个impl_.get_service()获取到的是class reactive_socket_service,在basic_socket_acceptor中可以看到

    class basic_socket_acceptor
      : public socket_base
    {
    ...
    detail::io_object_impl<detail::reactive_socket_service<Protocol>, Executor> impl_;
    }
    

    为此,找到了async_accept(..)的实现如下,看起来很长,但重点在最后的start_accept_op()

      // Start an asynchronous accept. The peer and peer_endpoint objects must be
      // valid until the accept's handler is invoked.
      template <typename Socket, typename Handler, typename IoExecutor>
      void async_accept(implementation_type& impl, Socket& peer,
          endpoint_type* peer_endpoint, Handler& handler, const IoExecutor& io_ex)
      {
        bool is_continuation =
          asio_handler_cont_helpers::is_continuation(handler);
    
        // Allocate and construct an operation to wrap the handler.
        typedef reactive_socket_accept_op<Socket, Protocol, Handler, IoExecutor> op;
        typename op::ptr p = { asio::detail::addressof(handler),
          op::ptr::allocate(handler), 0 };
        p.p = new (p.v) op(success_ec_, impl.socket_, impl.state_,
            peer, impl.protocol_, peer_endpoint, handler, io_ex);
    
        ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket",
              &impl, impl.socket_, "async_accept"));
    
        start_accept_op(impl, p.p, is_continuation, peer.is_open()); // 开始accept 的操作
        p.v = p.p = 0;
      }
    

    start_accept_op()调用了start_op()

    void reactive_socket_service_base::start_accept_op(
        reactive_socket_service_base::base_implementation_type& impl,
        reactor_op* op, bool is_continuation, bool peer_is_open)
    {
      if (!peer_is_open)
        start_op(impl, reactor::read_op, op, is_continuation, true, false);
      else
      {
        op->ec_ = asio::error::already_open;
        reactor_.post_immediate_completion(op, is_continuation);
      }
    }
    

    start_op()中调用的是reactor.start_op()

    void reactive_socket_service_base::start_op(
        reactive_socket_service_base::base_implementation_type& impl,
        int op_type, reactor_op* op, bool is_continuation,
        bool is_non_blocking, bool noop)
    {
      if (!noop)
      {
        if ((impl.state_ & socket_ops::non_blocking)
            || socket_ops::set_internal_non_blocking(
              impl.socket_, impl.state_, true, op->ec_))
        {
          reactor_.start_op(op_type, impl.socket_,
              impl.reactor_data_, op, is_continuation, is_non_blocking); //调用reactor的start_op()
          return;
        }
      }
    
      reactor_.post_immediate_completion(op, is_continuation);
    }
    

    reacttor.start_op()很长,截取重点如下,其会根据传入的op_type(read_op = 0, write_op = 1,connect_op = 1, except_op = 2) 做一些event的监听事件的epoll_mod,然后把operation(就是传入的回调)放入当前的描述符的op_type下标对应的数组中,用于触发的时候去运行

    void epoll_reactor::start_op(int op_type, socket_type descriptor,
        epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
        bool is_continuation, bool allow_speculative)
    {
     
    	...
      if (op_type == write_op)
          {
            if ((descriptor_data->registered_events_ & EPOLLOUT) == 0)
            {
              epoll_event ev = { 0, { 0 } };
              ev.events = descriptor_data->registered_events_ | EPOLLOUT;
              ev.data.ptr = descriptor_data;
              if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0)
              {
                descriptor_data->registered_events_ |= ev.events;
              }
              else
              {
                op->ec_ = asio::error_code(errno,
                    asio::error::get_system_category());
                scheduler_.post_immediate_completion(op, is_continuation);
                return;
              }
            }
          }
      ...
    
      descriptor_data->op_queue_[op_type].push(op);
      scheduler_.work_started();
    }
    

    这一小节整体看下来,差不多可以得到这样的关系 acceptor->reacotr_socket_service->reactor


    第三部分是io_context.run()的时候如何进行epoll( )并处理事件的

    从main开始进行io_context.run( )

    int main(int argc, char* argv[])
    {
      try
      {
        if (argc != 2)
        {
          std::cerr << "Usage: async_tcp_echo_server <port>
    ";
          return 1;
        }
    
        asio::io_context io_context;
    
        server s(io_context, std::atoi(argv[1]));
    
        io_context.run(); //开始循环处理
      }
      catch (std::exception& e)
      {
        std::cerr << "Exception: " << e.what() << "
    ";
      }
    
      return 0;
    }
    
    

    实指执行了impl_.run()

    io_context::count_type io_context::run()
    {
      asio::error_code ec;
      count_type s = impl_.run(ec);
      asio::detail::throw_error(ec);
      return s;
    }	
    

    其实指运行的是scheduler::run( ),看到其中有一个for循环运行do_run_one( )处理

    std::size_t scheduler::run(asio::error_code& ec)
    {
      ec = asio::error_code();
      if (outstanding_work_ == 0)
      {
        stop();
        return 0;
      }
    
      thread_info this_thread;
      this_thread.private_outstanding_work = 0;
      thread_call_stack::context ctx(this, this_thread);
    
      mutex::scoped_lock lock(mutex_);
    
      std::size_t n = 0;
      for (; do_run_one(lock, this_thread, ec); lock.lock()) //for循环处理
        if (n != (std::numeric_limits<std::size_t>::max)())
          ++n;
      return n;
    }
    

    do_run_one( )比较长,但是很好理解,其从任务队列获取任务,然后判断是否为task,如果是则说明需要做一次epoll对去接收fd的事件回来进行处理,如果不是,则是回调任务,把任务对应的回调函数运行即可;

    其次,可以看到当判断任务有多个的时候会执行wakeup_event_.unlock_and_signal_one(lock);去唤醒别的线程也来消费该任务队列,锁的管理上,在上面的for循环上,一直都加有锁,而下面真正获取了任务去执行的时候就会把锁给放掉,同时用task_cleanup这个RAII的类,在处理完任务之后把锁给加回来;

    std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,
        scheduler::thread_info& this_thread,
        const asio::error_code& ec)
    {
      while (!stopped_)
      {
        if (!op_queue_.empty())
        {
          // Prepare to execute first handler from queue.
          operation* o = op_queue_.front(); //从任务队列中获取任务
          op_queue_.pop();
          bool more_handlers = (!op_queue_.empty());
    
          if (o == &task_operation_) // epoll任务的话, 进行一次epoll,
          {
            task_interrupted_ = more_handlers;
    
            if (more_handlers && !one_thread_)
              wakeup_event_.unlock_and_signal_one(lock);//唤醒其它线程
            else
              lock.unlock();
    
            task_cleanup on_exit = { this, &lock, &this_thread };
            (void)on_exit;
    
            // Run the task. May throw an exception. Only block if the operation
            // queue is empty and we're not polling, otherwise we want to return
            // as soon as possible.
            task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue);
          }
          else //回调的话,进行回调
          {
            std::size_t task_result = o->task_result_;
    
            if (more_handlers && !one_thread_)
              wake_one_thread_and_unlock(lock);
            else
              lock.unlock();
    
            // Ensure the count of outstanding work is decremented on block exit.
            work_cleanup on_exit = { this, &lock, &this_thread };
            (void)on_exit;
    
            // Complete the operation. May throw an exception. Deletes the object.
            o->complete(this, ec, task_result); //调用回调函数
            this_thread.rethrow_pending_exception();
    
            return 1;
          }
        }
        else
        {
          wakeup_event_.clear(lock);
          wakeup_event_.wait(lock);
        }
      }
    
      return 0;
    }
    
    

    这里细看一下task->run( )进行epoll时是怎么收任务的, 将epoll返回的event中的事件保存下来以传给回调函数处理事件(上面调用回调函数时候传递的task_result),把这个描述符数据(包含了回调和结果)放入任务队列;这里有一个疑惑,epoll_reactor::run()这个函数是没有做加锁操作的,可epoll fd是共享的,存在多个线程同时epoll( )的问题需要做同步,不知道其怎么实现的

    void epoll_reactor::run(long usec, op_queue<operation>& ops)
    {
    
      // Block on the epoll descriptor.
      epoll_event events[128];
      int num_events = epoll_wait(epoll_fd_, events, 128, timeout);
    
    
      // Dispatch the waiting events.
      for (int i = 0; i < num_events; ++i)
      {
    		...
    		void* ptr = events[i].data.ptr;
    		descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
            descriptor_data->set_ready_events(events[i].events); //保存事件
            ops.push(descriptor_data);	// 放入任务队列
            ...
      }
    }
    
  • 相关阅读:
    深度学习-Tensorflow2.2-图像处理{10}-图像语义分割-23
    深度学习-Tensorflow2.2-图像处理{10}-图像定位/优化/图运算/及GPU优化等-22
    深度学习-Tensorflow2.2-模型保存与恢复{9}-保存与恢复-21
    深度学习-Tensorflow2.2-多分类{8}-多输出模型实例-20
    【吴恩达课程使用】keras cpu版安装【接】- anaconda (python 3.7) win10安装 tensorflow 1.8 cpu版
    机器学习算法推导--矩阵求导中为何会有“转置”?
    博士五年之后五年的总结-【田渊栋】
    博士经验总结-【田渊栋】
    吴恩达《深度学习》-课后测验-第三门课 结构化机器学习项目(Structuring Machine Learning Projects)-Week2 Autonomous driving (case study) (case study)( 自动驾驶 (案例研究))
    吴恩达《深度学习》-课后测验-第三门课 结构化机器学习项目(Structuring Machine Learning Projects)-Week1 Bird recognition in the city of Peacetopia (case study)( 和平之城中的鸟类识别(案例研究))
  • 原文地址:https://www.cnblogs.com/ishen/p/14593598.html
Copyright © 2011-2022 走看看