zoukankan      html  css  js  c++  java
  • TCP网络库:Acceptor、TcpServer、TcpConnection

    Acceptor类:用于接收新的TCP连接,该类是内部class,供TcpServer使用,生命期由TcpServer控制

    类成员:

    class Acceptor : boost::noncopyable
    {
     public:
      typedef boost::function<void (int sockfd,
                                    const InetAddress&)> NewConnectionCallback;
    
      Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);
      ~Acceptor();
    
      void setNewConnectionCallback(const NewConnectionCallback& cb)
      { newConnectionCallback_ = cb; }
    
      bool listenning() const { return listenning_; }
      void listen();
    
     private:
      //调用accept()接受新连接,并回调用户callback
      void handleRead();
      //acceptChannel_所属loop对象
      EventLoop* loop_;
      //此socket是listen socket
      Socket acceptSocket_;
      //channel对象监测上述socket上的readable事件,并在channel对象的hanleEvent方法中回调handleRead(),handleRead会调用accept来接受新连接
      Channel acceptChannel_;
      NewConnectionCallback newConnectionCallback_;
      bool listenning_;
      int idleFd_;
    };
    //构造函数调用socket() 、bind(),即创建TCP服务端的传统步骤
    //socket() bind() listen()任何一个步骤出错都会造成程序终止,故没有错误处理
    //sockets::createNonblockingOrDie创建非阻塞的socket
    Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
      : loop_(loop),
        acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),
        acceptChannel_(loop, acceptSocket_.fd()),
        listenning_(false),
        idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
    {
      assert(idleFd_ >= 0);
      acceptSocket_.setReuseAddr(true);
      acceptSocket_.setReusePort(reuseport);
      acceptSocket_.bindAddress(listenAddr);
      acceptChannel_.setReadCallback(
          boost::bind(&Acceptor::handleRead, this));
    }
    //调用listen(),监听listen_fd,当有新连接到达时,acceptChannel_会处理
    void Acceptor::listen()
    {
      loop_->assertInLoopThread();
      listenning_ = true;
      acceptSocket_.listen();
      acceptChannel_.enableReading();
    }
    //accept策略
    //参考acceptable strategies for improving web server performance
    //回调函数,在acceptChannel_的handleEvent方法中被调用,接受客户端连接 void Acceptor::handleRead() { loop_->assertInLoopThread(); InetAddress peerAddr; //FIXME loop until no more int connfd = acceptSocket_.accept(&peerAddr); if (connfd >= 0) { // string hostport = peerAddr.toIpPort(); // LOG_TRACE << "Accepts of " << hostport;
    //当在handleRead建立新的客户连接后,会调用这个回调函数,我觉得它的作用是处理客户的业务逻辑,问题是什么时候设置这个回调函数(在TcpServer的构造函数中设置)
    if (newConnectionCallback_) { newConnectionCallback_(connfd, peerAddr); } else { sockets::close(connfd); } } else { LOG_SYSERR << "in Acceptor::handleRead"; // Read the section named "The special problem of // accept()ing when you can't" in libev's doc. // By Marc Lehmann, author of livev. //本进程的文件描述符已经达到上限,由于没有socket文件描述符来表示这个连接,就无法close它.若epoll_wait是LT,则每次调用都会立刻返回,因为新连接还等待处理 //准备一个空闲的文件描述符,在这种情况下,先关闭这个空闲的fd,然后accept拿到新socket连接的描述符,随后close它,再重新打开一个空闲文件给该空闲文件描述符 if (errno == EMFILE) { ::close(idleFd_); idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); ::close(idleFd_); idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); } } }

    TcpServer类:管理accept获得的tcp连接.TcpServer是供用户直接使用的,生命期由用户控制.

    ///TcpServer内部使用Acceptor来获得新连接的fd,它保存用户提供的connectionCallback和MessageCallback,在新建TcpConnection的
    ///时候会原样传给后者,TcpServer持有目前存活的TcpConnection的shared_ptr(定义为TcpConnectionPtr)
    ///在新连接到达时,Acceptor会回调newConnection(),后者会创建TcpConnection对象conn,把它加入ConnectionMap,设置好callback,再调用
    ///conn->connectEstablished(),其中会回调用户提供的ConnectionCallback.
    class TcpServer : boost::noncopyable
    {
     public:
      /// Starts the server if it's not listenning.
      ///
      /// It's harmless to call it multiple times.
      /// Thread safe.
      void start();
    
      /// Set connection callback.
      /// Not thread safe.
      void setConnectionCallback(const ConnectionCallback& cb)
      { connectionCallback_ = cb; }
    
      /// Set message callback.
      /// Not thread safe.
      void setMessageCallback(const MessageCallback& cb)
      { messageCallback_ = cb; }
    
      /// Set write complete callback.
      /// Not thread safe.
      void setWriteCompleteCallback(const WriteCompleteCallback& cb)
      { writeCompleteCallback_ = cb; }
    
     private:
      /// Not thread safe, but in loop
      void newConnection(int sockfd, const InetAddress& peerAddr);
      /// Thread safe.
      void removeConnection(const TcpConnectionPtr& conn);
      /// Not thread safe, but in loop
      void removeConnectionInLoop(const TcpConnectionPtr& conn);
      //key是TcpConnection对象的名字
      typedef std::map<string, TcpConnectionPtr> ConnectionMap;
    
      EventLoop* loop_;  // the acceptor loop
      const string ipPort_;
      const string name_;
      boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing Acceptor
      boost::shared_ptr<EventLoopThreadPool> threadPool_;
      ConnectionCallback connectionCallback_;
      MessageCallback messageCallback_;
      WriteCompleteCallback writeCompleteCallback_;
      ThreadInitCallback threadInitCallback_;
      AtomicInt32 started_;
      // always in loop thread
      int nextConnId_;
      ConnectionMap connections_;
    };
    //新的客户连接建立后,会调用该函数,sockfd是新连接的fd,peerAddr是客户地址
    //该函数会创建TcpConnection对象conn,建立对象名字到对象的映射,设置好conn上的回调函数,最后调用TcpConnection类中的connectEstablished方法
    void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
    {
        loop_->assertInLoopThread();
        EventLoop* ioLoop = threadPool_->getNextLoop();
        char buf[64];
        snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
        ++nextConnId_;
        string connName = name_ + buf;
    
        LOG_INFO << "TcpServer::newConnection [" << name_
                 << "] - new connection [" << connName
                 << "] from " << peerAddr.toIpPort();
        InetAddress localAddr(sockets::getLocalAddr(sockfd));
        // FIXME poll with zero timeout to double confirm the new connection
        // FIXME use make_shared if necessary
        TcpConnectionPtr conn(new TcpConnection(ioLoop,
            connName,
            sockfd,
            localAddr,
            peerAddr));
        connections_[connName] = conn;
        conn->setConnectionCallback(connectionCallback_);
        conn->setMessageCallback(messageCallback_);
        conn->setWriteCompleteCallback(writeCompleteCallback_);
        conn->setCloseCallback(
            boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
        ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
    }

    muduo尽量让依赖是单项的,TcpServer会用到Acceptor,但Acceptor并不知道TcpServer的存在。TcpServer会创建TcpConnection,但TcpConnection并不知道TcpServer的存在

    TcpConnection类:

     

    //作用是为刚建立的客户连接conn提供channel对象进行管理,TcpConnection使用Channel来获得socket上的IO事件
    void TcpConnection::connectEstablished()
    {
        loop_->assertInLoopThread();
        //当前状态得是未建立连接
        assert(state_ == kConnecting);
        //将当前状态设置为已建立连接
        setState(kConnected);
        channel_->tie(shared_from_this());
        channel_->enableReading();
    
        connectionCallback_(shared_from_this());
    }
    
    //TcpConnection断开连接的实现
    //handleRead检查read的返回值,根据返回值分别调用messageCallback_、handleClose、handleError
    void TcpConnection::handleRead(Timestamp receiveTime)
    {
        loop_->assertInLoopThread();
        int savedErrno = 0;
    ////使用buffer来读取数据 ssize_t n
    = inputBuffer_.readFd(channel_->fd(), &savedErrno); if (n > 0) { messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); } else if (n == 0) { handleClose(); } else { errno = savedErrno; LOG_SYSERR << "TcpConnection::handleRead"; handleError(); } } //closeCallback_是TcpServer在newConnection函数中注册的,是TcpServer::removeConnection方法.TcpServer::removeConnection方法把当前TcpConnection从ConnectionMap中移除,然后调用TcpConnection::connectDestroyed //TcpConnection::connectDestroyed()设置当前TcpConnection的channel对象不再监听任何事件,然后移除该channel对象。 void TcpConnection::handleClose() { loop_->assertInLoopThread(); LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString(); assert(state_ == kConnected || state_ == kDisconnecting); // we don't close fd, leave it to dtor, so we can find leaks easily. setState(kDisconnected); channel_->disableAll(); TcpConnectionPtr guardThis(shared_from_this()); connectionCallback_(guardThis); // must be the last line closeCallback_(guardThis); } //设置当前TcpConnection的channel对象不再监听任何事件,然后移除该channel对象。 void TcpConnection::connectDestroyed() { loop_->assertInLoopThread(); if (state_ == kConnected) { setState(kDisconnected); channel_->disableAll(); connectionCallback_(shared_from_this()); } channel_->remove(); }

    http://www.ccvita.com/515.html

    使用Linuxepoll模型,水平触发模式;当socket可写时,会不停的触发socket可写的事件,如何处理?

    第一种最普遍的方式:
    需要向socket写数据的时候才把socket加入epoll,等待可写事件。
    接受到可写事件后,调用write或者send发送数据。
    当所有数据都写完后,把socket移出epoll。

    这种方式的缺点是,即使发送很少的数据,也要把socket加入epoll,写完后在移出epoll,有一定操作代价。

    一种改进的方式:
    开始不把socket加入epoll,需要向socket写数据的时候,直接调用write或者send发送数据。如果返回EAGAIN,把socket加入epoll,在epoll的驱动下写数据,全部数据发送完毕后,再移出epoll。

    这种方式的优点是:数据不多的时候可以避免epoll的事件处理,提高效率。

    muduo采用level trigger,因此我们只在需要时才关注writable事件,否则就会造成busy loop

    TcpConnection发送数据:

    两个难点:关注writable事件的时机、发送数据的速度高于对方接收数据的速度,会造成数据在本地内存中堆积

    第二个难点的解决方案:设置一个callback highWaterMarkCallback,如果输出缓冲的长度超过用户指定的大小,就会触发回调

    //sendInLoop会先尝试直接发送数据,如果一次发送完毕就不会启用WriteCallback,如果只发送了部分数据,则把剩余的数据放入outputBuffer_,并
    //开始关注writable事件,以后在handlerWrite()中发送剩余的数据
    void TcpConnection::sendInLoop(const void* data, size_t len)
    {
        loop_->assertInLoopThread();
        ssize_t nwrote = 0;
        size_t remaining = len;
        bool faultError = false;
        if (state_ == kDisconnected)
        {
            LOG_WARN << "disconnected, give up writing";
            return;
        }
        // if no thing in output queue, try writing directly
        if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
        {
            nwrote = sockets::write(channel_->fd(), data, len);
            if (nwrote >= 0)
            {
                remaining = len - nwrote;
                if (remaining == 0 && writeCompleteCallback_)
                {
                    loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
                }
            }
            else // nwrote < 0
            {
                nwrote = 0;
                if (errno != EWOULDBLOCK)
                {
                    LOG_SYSERR << "TcpConnection::sendInLoop";
                    if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
                    {
                        faultError = true;
                    }
                }
            }
        }
    
        assert(remaining <= len);
        if (!faultError && remaining > 0)
        {
            size_t oldLen = outputBuffer_.readableBytes();
            if (oldLen + remaining >= highWaterMark_
                && oldLen < highWaterMark_
                && highWaterMarkCallback_)
            {
                loop_->queueInLoop(boost::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
            }
            outputBuffer_.append(static_cast<const char*>(data) + nwrote, remaining);
            if (!channel_->isWriting())
            {
                channel_->enableWriting();
            }
        }
    }
    
    //当socket可写时,发送outputBuffer_中的数据,一旦发送完毕,立刻停止观察writable事件,避免busy loop
    void TcpConnection::handleWrite()
    {
        loop_->assertInLoopThread();
        if (channel_->isWriting())
        {
            ssize_t n = sockets::write(channel_->fd(),
                outputBuffer_.peek(),
                outputBuffer_.readableBytes());
            if (n > 0)
            {
                outputBuffer_.retrieve(n);
                if (outputBuffer_.readableBytes() == 0)
                {
                    channel_->disableWriting();
                    if (writeCompleteCallback_)
                    {
                        loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
                    }
                    if (state_ == kDisconnecting)
                    {
                        shutdownInLoop();
                    }
                }
            }
            else
            {
                LOG_SYSERR << "TcpConnection::handleWrite";
                // if (state_ == kDisconnecting)
                // {
                //   shutdownInLoop();
                // }
            }
        }
        else
        {
            LOG_TRACE << "Connection fd = " << channel_->fd()
                      << " is down, no more writing";
        }
    }

     http://blog.csdn.net/luojiaoqq/article/details/12780051

  • 相关阅读:
    hive表链接
    hive聚合函数和表生成函数
    hive条件函数
    hive日期函数
    hive之size函数和cast转换函数
    hive数学函数
    hive排序
    5G基站概述
    MEC边缘云平台
    ELK日志系统的架构
  • 原文地址:https://www.cnblogs.com/ljygoodgoodstudydaydayup/p/5666952.html
Copyright © 2011-2022 走看看