zoukankan      html  css  js  c++  java
  • muduo学习笔记(六) 多线程的TcpServer

    @


    前言

    前面文章介绍了muduo网络库的单线程设计方式,即一个EventLoop 处理所有的事件,包括链接的建立、IO、计算、以及链接的销毁,本文介绍下muduo中的多线程设计方法。

    多线程TcpServer

    EventLoopThreadPool

    多线程的muduo::TcpServer,主要通过添加一个EventLoopThreadPool 事件循环线程池实现,新建TcpConnection时从event loop pool里挑选一个loop给TcpConnection用。 也就是说多线程TcpServer自己的EventLoop只用来接受新连接, 而新连接会用其他EventLoop来执行IO。 (单线程TcpServer的EventLoop是与TcpConnection共享的。)

    EventLoopThreadPooll 按one loop per thread的思想实现多线程TcpServer, 此时主线程循环只负责TCP链接的建立,及任务的分配,需要让哪个线程干活, 就把timer或IO(如TCP连接) 注册到那个线程的循环里即可;对实时性有要求的connection可以单独用一个线程; 数据量大的connection可以独占一个线程;并把数据处理任务分摊到另几个计算线程中(用线程池);其他次要的辅助性connections共享一个线程。

    线程池设计模式

    池是一种设计模式,线程是是一种资源,线程池设计模式通常是事先申请一定的资源,当需要使用时,去资源池中申请资源,用完后再放回资源池中。EventLoopThreadPool 正是借鉴了这种设计模式,虽然使用的线程并不会放回资源池中,但是本身的一个EventLoop即一个Reactor,本身就具备等待机制了。

    muduo中的使用

    TcpServer每次新建一条TcpConnection就会通过EventLoopThreadPool::getNextLoop()方法来取一个EventLoop, 目前的getNextLoop()只是循环的从池中取一条loop,如果提供给每条TcpConncetion的是均等的服务,那么这样就能很均匀的分配系统的资源了。

    TcpServer的工作方式取决于EventLoopThreadPool中线程的创建数量。
    0 意味着所有的I/O 都在TcpServer的主事件循环中,不会创建新的线程。
    1 意味着所有的 I/O 在另一个线程中 ,TcpServer的主线程只负责建立连接。
    N 意味着新的连接会被循环的分配到N条线程中工作。

    连接的建立、消息、销毁

    Alt text

    on_connection

    1、对于新的连接new connection 会在 TcpServer的Loop 中由Acceptor建立,

    void Acceptor::handleRead()
    {
      LOG_TRACE << "Acceptor::handleRead()";
      p_loop->assertInLoopThread();
      InetAddress peerAddr;
      int connfd = m_acceptSocket.accept(&peerAddr);
    

    2、之后从loop_thread_pool中取一个loop将新的connection注册上去。

    void TcpServer::newConnetion(int sockfd, const InetAddress& peerAddr)
    {
    	InetAddress localAddr(sockets::getLocalAddr(sockfd));
        EventLoop* loop;
        loop = ex_event_loop_thread_pool->getNextLoop();
        TcpConnectionPtr conn(new TcpConnection(loop,
                   connName, sockfd, localAddr, peerAddr));
        conn->setConnectionCallBack(m_connectionCallBack);
        conn->setMessageCallBack(m_messageCallBack);
        conn->setCloseCallBack(std::bind(&TcpServer::removeConnection, this, std::placeholders::_1));
        loop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
    

    on_message

    1、message将在connection自己所处的loop中处理,无需多言。

    on_close

    1、连接的销毁稍微复杂一点,connection会在自己的loop中调用Tcp:Server::removeConnection,我们需要将他移动到TcpServer的Loop线程中先解除TcpServer对connection的使用,然后在回到connection自己的loop中销毁连接connectDestroyed()。

    void TcpServer::removeConnection(const TcpConnectionPtr& conn)
    {
      // FIXME: unsafe
      p_loop->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
    }
    
    void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
    {
      p_loop->assertInLoopThread();
      LOG_INFO << "TcpServer::removeConnectionInLoop [" << m_name
               << "] - connection " << conn->name();
      m_closeCallBack(conn);
      EventLoop* ioLoop = conn->getLoop();
      ioLoop->queueInLoop(
          std::bind(&TcpConnection::connectDestroyed, conn));
    }
    

    简单透传服务实现

    利用muduo多线程TcpServer 实现一个简单的透传服务,从connection中收到的消息直接转发给所有的其他connection。

    //ws_tcp_server.hpp
    #pragma once
    #include <async_logging>
    #include <muduo_server>
    
    namespace ws {
    
    extern std::unique_ptr<muduo::EventLoop> active_event_loop;
    
    class tcp_server{
    public:
      //! ctor
      tcp_server(void);
      //! dtor
      ~tcp_server(void) = default;
    
      //! copy ctor
      tcp_server(const tcp_server&) = delete;
      //! assignment operator
      tcp_server& operator=(const tcp_server&) = delete;
    
    public:
    
      //! listen_and_serve
      //! start server and listened on port
      void listen_and_serve(std::size_t port);
    
    private:
    
      void on_connection(const muduo::TcpConnectionPtr&);
    
      void on_message(const muduo::TcpConnectionPtr& , muduo::Buffer*, ssize_t );
    
      void on_close(const muduo::TcpConnectionPtr&);
    private:
    
      //! 
      //typedef std::vector<muduo::TcpConnectionPtr> connections_t;
    
      //! 
      typedef std::map<std::string, muduo::TcpConnectionPtr> connections_map_t;
    
    private:
    
      //! master connection
      //muduo::TcpConnectionPtr m_master_connection;
    
      //! slave connection
      //connections_t m_connections;
    
      //! muduo tcp server
      std::unique_ptr<muduo::TcpServer> m_tcp_server;
    
      //! connections map 
      connections_map_t m_connections_map;
    
      std::size_t cnt_connetions;
    };
    
    }
    
    
    //ws_tcp_server.cpp
    #include "ws_tcp_server.hpp"
    
    #include <assert.h>
    
    static muduo::EventLoopThread static_event_loop;
    std::unique_ptr<muduo::EventLoop> ws::active_event_loop = std::unique_ptr<muduo::EventLoop>(static_event_loop.startLoop());
    
    using namespace ws;
    
    tcp_server::tcp_server()
      :cnt_connetions(0)
    {
    
    }
    
    void tcp_server::listen_and_serve(std::size_t port)
    {
      assert(active_event_loop);
    
      muduo::ex_event_loop_thread_pool = std::unique_ptr<muduo::EventLoopThreadPool>(new muduo::EventLoopThreadPool(ws::active_event_loop.get(), "event_loop", 3));
      active_event_loop->runInLoop(std::bind(&muduo::EventLoopThreadPool::start, muduo::ex_event_loop_thread_pool.get()));
    
      m_tcp_server = std::unique_ptr<muduo::TcpServer>(new muduo::TcpServer(active_event_loop.get(), InetAddress(port)));
    
      m_tcp_server->setConnectionCallBack(std::bind(&tcp_server::on_connection, this, std::placeholders::_1));
      m_tcp_server->setMessageCallBack(std::bind(&tcp_server::on_message, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
      m_tcp_server->setCloseCallBack(std::bind(&tcp_server::on_close, this, std::placeholders::_1));
      m_tcp_server->start();
    }
    
    void tcp_server::on_connection(const muduo::TcpConnectionPtr& conn)
    {
      cnt_connetions++;
      std::string conn_name = "tcp connection " + std::to_string(cnt_connetions);
      conn->setConnectionName(conn_name);
    
      m_connections_map[conn_name] = std::move(conn);
    
    }
    
    void tcp_server::on_message(const muduo::TcpConnectionPtr& conn, muduo::Buffer* buf, ssize_t size)
    {
      connections_map_t::iterator iter;
      for(iter = m_connections_map.begin(); iter != m_connections_map.end(); iter++)
      {
        if(iter->second != conn){
          LOG_DEBUG << "con " << conn->name() << " send to " << iter->second->name();
          iter->second->send(buf->peek(), size);
        }
      }
    
      buf->retrieve(size);
    
    }
    
    void tcp_server::on_close(const muduo::TcpConnectionPtr& conn)
    {
      size_t n = m_connections_map.erase(conn->name());
      (void)n;
      assert(n == 1);
    }
    
    
    // main.cpp
    #include <async_logging>
    #include "ws_tcp_server.hpp"
    
    int main()
    {
      Logger::setLogLevel(Logger::DEBUG);
    
      ws::tcp_server tcp_server;
      tcp_server.listen_and_serve(8008);
    
      getchar();
    }
    
    ../bin/ws_server 
    2019-03-19 16:22:17.202112 [TRACE] [EventLoopThread.cpp:41] [startLoop] EventLoopThread::startLoop() wait()
    2019-03-19 16:22:17.203199 [TRACE] [TimerQueue.cpp:20] [createTimerfd] createTimerfd() fd : 4
    2019-03-19 16:22:17.203376 [TRACE] [Epoll.cpp:115] [updateChannel] fd= 4 events 3
    2019-03-19 16:22:17.203549 [TRACE] [EventLoop.cpp:23] [createEventfd] createEventfd() fd : 5
    2019-03-19 16:22:17.203711 [INFO ] [EventLoop.cpp:43] EventLoop Create 0x7F1E72371B30 in thread 44057
    2019-03-19 16:22:17.203872 [TRACE] [Epoll.cpp:115] [updateChannel] fd= 5 events 3
    2019-03-19 16:22:17.204188 [TRACE] [EventLoopThread.cpp:65] [threadFunc] EventLoopThread::threadFunc() notify()
    2019-03-19 16:22:17.204332 [TRACE] [EventLoop.cpp:74] [loop] EventLoop 0x7F1E72371B30 start loopig
    2019-03-19 16:22:17.204477 [TRACE] [Epoll.cpp:72] [poll] Epoll::poll() maxConcurrencySize 2
    2019-03-19 16:22:17.204794 [TRACE] [EventLoopThread.cpp:46] [startLoop] EventLoopThread::startLoop() wakeup
    2019-03-19 16:22:17.210324 [INFO ] [EventLoop.cpp:43] EventLoop Create 0x7F1E71B70B30 in thread 44058
    2019-03-19 16:22:17.212173 [INFO ] [EventLoop.cpp:43] EventLoop Create 0x7F1E7136FB30 in thread 44059
    2019-03-19 16:22:17.212725 [INFO ] [EventLoop.cpp:43] EventLoop Create 0x7F1E70B6EB30 in thread 44060
    2019-03-19 16:22:22.399790 [INFO ] [TcpServer.cpp:52] TcpServer::newConnetion() [Serv ] - new connection [Serv #1] from 127.0.0.1:51400
    2019-03-19 16:22:22.399833 [INFO ] [TcpServer.cpp:61] Loop 0x7F1E71B70B30
    2019-03-19 16:22:22.399842 [DEBUG] [TcpConnection.cpp:27] [TcpConnection] TcpConnection::ctor[Serv #1] at 0x7F1E6C0046F0 fd=17
    2019-03-19 16:22:24.901612 [INFO ] [TcpServer.cpp:52] TcpServer::newConnetion() [Serv ] - new connection [Serv #2] from 127.0.0.1:51402
    2019-03-19 16:22:24.901625 [INFO ] [TcpServer.cpp:61] Loop 0x7F1E7136FB30
    2019-03-19 16:22:24.901635 [DEBUG] [TcpConnection.cpp:27] [TcpConnection] TcpConnection::ctor[Serv #2] at 0x7F1E6C006A30 fd=18
    2019-03-19 16:22:25.903035 [DEBUG] [ws_tcp_server.cpp:58] [on_message] con tcp connection 2 send to tcp connection 1
    2019-03-19 16:22:32.499161 [INFO ] [TcpServer.cpp:52] TcpServer::newConnetion() [Serv ] - new connection [Serv #3] from 127.0.0.1:51404
    2019-03-19 16:22:32.499174 [INFO ] [TcpServer.cpp:61] Loop 0x7F1E70B6EB30
    2019-03-19 16:22:32.499183 [DEBUG] [TcpConnection.cpp:27] [TcpConnection] TcpConnection::ctor[Serv #3] at 0x7F1E6C008D80 fd=19
    2019-03-19 16:22:33.499744 [DEBUG] [ws_tcp_server.cpp:58] [on_message] con tcp connection 3 send to tcp connection 1
    2019-03-19 16:22:33.499771 [DEBUG] [ws_tcp_server.cpp:58] [on_message] con tcp connection 3 send to tcp connection 2
    2019-03-19 16:22:35.986563 [DEBUG] [ws_tcp_server.cpp:58] [on_message] con tcp connection 2 send to tcp connection 1
    2019-03-19 16:22:35.986670 [DEBUG] [ws_tcp_server.cpp:58] [on_message] con tcp connection 2 send to tcp connection 3
    2019-03-19 16:22:36.982713 [DEBUG] [ws_tcp_server.cpp:58] [on_message] con tcp connection 2 send to tcp connection 1
    2019-03-19 16:22:36.982759 [DEBUG] [ws_tcp_server.cpp:58] [on_message] con tcp connection 2 send to tcp connection 3
    2019-03-19 16:22:38.244229 [INFO ] [TcpServer.cpp:90] TcpServer::removeConnectionInLoop [Serv ] - connection tcp connection 2
    2019-03-19 16:22:38.244281 [DEBUG] [TcpConnection.cpp:36] [~TcpConnection] TcpConnection::dtor[tcp connection 2] at 0x7F1E6C006A30 fd=18 state=kDisConnected
    
    
  • 相关阅读:
    第三次作业
    第二次作业
    第一次作业
    软件工程第0次作业
    第四次作业
    第三次作业
    第二次作业
    第一次作业
    第零次作业
    第四次软件工程作业
  • 原文地址:https://www.cnblogs.com/ailumiyana/p/10563987.html
Copyright © 2011-2022 走看看