zoukankan      html  css  js  c++  java
  • Muduo 多线程模型对比

      本文主要对比Muduo多线程模型方案8 和方案9 。

      方案8:reactor + thread pool ,有一个线程来充当reactor 接受连接分发事件,将要处理的事件分配给thread pool中的线程,由thread pool 来完成事件处理。实例代码见:examples/sudoku/server_threadpool.cc

      这里截取关键部分代码进行说明。

    class SudokuServer
    {
     public :
      SudokuServer(EventLoop* loop, const InetAddress& listenAddr, int numThreads)
        : loop_(loop),
          server_(loop, listenAddr, "SudokuServer"),
          numThreads_(numThreads),
          startTime_(Timestamp::now())
      {
        server_.setConnectionCallback(
            boost::bind(&SudokuServer::onConnection, this, _1));
        server_.setMessageCallback(
            boost::bind(&SudokuServer::onMessage, this, _1, _2, _3));
      }
     
      void start()
      {
        LOG_INFO << "starting " << numThreads_ << " threads.";
        threadPool_.start(numThreads_); // 注意这里,threadPool 的类型是: ThreadPool,且位置在start 里面
        server_.start();
      }
     
     private :
      void onConnection(const TcpConnectionPtr& conn)
      {
        LOG_TRACE << conn->peerAddress().toIpPort() << " -> "
            << conn->localAddress().toIpPort() << " is "
            << (conn->connected() ? "UP" : "DOWN");
      }
     
      void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
      {
    ...
            if (!processRequest(conn, request)) // 封装计算任务执行方法
            {
              conn->send( "Bad Request! ");
              conn->shutdown();
              break;
            }
          }
    ...
        }
      }
     
      bool processRequest(const TcpConnectionPtr& conn, const string& request)
      {
    ...
     
        if (puzzle.size() == implicit_cast<size_t>(kCells))
        {
          threadPool_.run(boost::bind(&solve, conn, puzzle, id));// 将计算任务转移到 threadPool 线程
        }
        else
        {
          goodRequest = false;
        }
        return goodRequest;
      }
     
      static void solve(const TcpConnectionPtr& conn,
                        const string& puzzle,
                        const string& id)
      {
        LOG_DEBUG << conn->name();
        string result = solveSudoku(puzzle); // solveSudou 是一个pure function, 是可重入的 
        if (id.empty())
        {
          conn->send(result+ " ");
        }
        else
        {
          conn->send(id+ ":"+result+ " ");
        }
      }
     
      EventLoop* loop_;
      TcpServer server_;
      ThreadPool threadPool_; // 注意类型,方案8, reactor + threadpool
      int numThreads_;
      Timestamp startTime_;
    };
     
    void ThreadPool::start( int numThreads)  // 创建 thread pool,具体thread 调度这里暂时不分析
    {
      assert(threads_.empty());
      running_ = true;
      threads_.reserve(numThreads);
      for (int i = 0; i < numThreads; ++i)
      {
        char id[32];
        snprintf(id, sizeof id, "%d", i);
        threads_.push_back( new muduo::Thread(
              boost::bind(&ThreadPool::runInThread, this), name_+id));
        threads_[i].start();
      }
    }
     
    方案9:main-reactor + subreactors, one loop per thread, 有一个主线程来扮演main-reactor 专门语句 accept 连接,其它线程负责读写文件描述符(socket)
     
    class SudokuServer
    {
     public :
      SudokuServer(EventLoop* loop, const InetAddress& listenAddr, int numThreads)
        : loop_(loop),
          server_(loop, listenAddr, "SudokuServer"),
          numThreads_(numThreads),
          startTime_(Timestamp::now())
      {
        server_.setConnectionCallback(
            boost::bind(&SudokuServer::onConnection, this, _1));
        server_.setMessageCallback(
            boost::bind(&SudokuServer::onMessage, this, _1, _2, _3));
        server_.setThreadNum(numThreads); // 设置 EventLoopThreadPool里面的thread数量
      }
     
      void start()
      {
        LOG_INFO << "starting " << numThreads_ << " threads.";
        server_.start();
      }
     
     private :
      void onConnection(const TcpConnectionPtr& conn)
      {
        LOG_TRACE << conn->peerAddress().toIpPort() << " -> "
            << conn->localAddress().toIpPort() << " is "
            << (conn->connected() ? "UP" : "DOWN");
      }
     
      void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
      {
    ...
            if (!processRequest(conn, request)) //准备计算
            {
              conn->send( "Bad Request! ");
              conn->shutdown();
              break;
            }
    ...
        }
      }
     
      bool processRequest(const TcpConnectionPtr& conn, const string& request)
      {
    ...
        if (puzzle.size() == implicit_cast<size_t>(kCells))
        {
          LOG_DEBUG << conn->name();
          string result = solveSudoku(puzzle); // 计算在当前线程完成
          if (id.empty())
          {
            conn->send(result+ " ");
          }
    ...
      }
      // 注意这里没有类型为ThreadPool的 threadPool_成员,整个类使用Muduo默认线程模型的EventLoopThreadPool,TcpServer 聚合了EventLoopThreadPool
      EventLoop* loop_;
      TcpServer server_;
      int numThreads_;
      Timestamp startTime_;
    };
     
     
    void TcpServer::setThreadNum( int numThreads)
    {
      assert(0 <= numThreads);
      threadPool_->setThreadNum(numThreads); // 设置了 EventLoopThreadPool 里面的线程个数,为后面的threadPool_->start()服务
    }
     
    void TcpServer::start()
    {
      if (!started_)
      {
        started_ = true;
        threadPool_->start(threadInitCallback_); // TcpServer 中的 threadPool 类型是 EventLoopThreadPool
      }
     
      if (!acceptor_->listenning())
      {
        loop_->runInLoop(
            boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
      }
    }
     
    void EventLoopThreadPool::start( const ThreadInitCallback& cb) // 开启线程的方式是使用EventLoopThread,这个类将EventLoop 和 Thread 封装在一起实现 one loop per thread
    {
      assert(!started_);
      baseLoop_->assertInLoopThread();
     
      started_ = true;
     
      for (int i = 0; i < numThreads_; ++i)
      {
        EventLoopThread* t = new EventLoopThread(cb); // 设置线程的 callback
        threads_.push_back(t); 
        loops_.push_back(t->startLoop()); // 保存loop方便管理和分配任务,任务分配其实是通过EventLoop::runInLoop() 来进行的
      }
      if (numThreads_ == 0 && cb)
      {
        cb(baseLoop_);
      }
    }
     
      总结一下,这里所谓的Reactor就是持有Poller的结构(稍微有点狭隘,这里先就这样理解),Poller负责事件监听和分发。持有EventLoop的结构就持有Poller。
      对于方案8只有一个类持有EventLoop,也就是只创建了一个EventLoop,这个Loop就是reactor,其它的Thread 是通过ThreadPool来实现的,因此只有reactor所在的线程来完成I/O,其它线程用于完成计算任务,所以说这个模型适合于计算密集型而不是I/O密集型。
      对于方案9,存在多个Reactor,其中main reactor 持有Acceptor,专门用于监听三个半事件中的连接建立,消息到达和连接断开以及消息发送事件都让sub reactor来完成。由于main reactor 只关心连接建立事件,能够适应高并发的IO请求,多个subreactor的存在也能兼顾I/O与计算,因此被认为是一个比较好的方案。
       后面还会深入学习Muduo网络库相关的内容,包括Reactor结构的简化,线程池的实现,现代C++的编写方式,使用C++11进行重写等。现在看来C++11 thread library 提供的接口基本可以替换 posix thread library,虽然底层也许是通过posix thread实现的,毕竟Linux内核针对NPTL进行过修改。C++11 提供了 thread_local 来描述 线程局部存储,但是没有pthread_key_create() 提供 destructor那样的功能,或者遇到需要使用TLS的地方转过来使用posix 提供的接口。
     
    Muduo 多线程 线程池 reactor
  • 相关阅读:
    Zabbix的前台SQL注射漏洞利用
    LeetCode OJ--Permutation Sequence *
    LeetCode OJ--Next Permutation *
    LeetCode OJ--Permutations II
    LeetCode OJ--Permutations *
    小算法-计算下一个排列
    LeetCode OJ--Gas Station**
    LeetCode OJ--Insert Interval **
    LeetCode OJ--Search Insert Position
    Ruby自动化测试(操作符的坑)
  • 原文地址:https://www.cnblogs.com/zhuyp1015/p/4412985.html
Copyright © 2011-2022 走看看