zoukankan      html  css  js  c++  java
  • Thrift线程和状态机分析

    目录

    目录 1

    1. 工作线程和IO线程 1

    2. TNonblockingServer::TConnection::transition() 2

    3. RPC函数被调用过程 3

    4. 管道和任务队列 4

    5. 对象间关系 5

    6. 相关代码摘要 6

    1. 工作线程和IO线程

    启动Thrift时,可启动两类线程,一是TNonblockingIOThread,另一是Worker

     

    TNonblockingIOThread负责接受连接,和收发数据;而Worker负责回调服务端的用户函数。

    TNonblockingIOThread::registerEvents主要做了两件事:

    1) 注册TNonblockingIOThread::listenHandler(),这个是用来接受连接请求的;

    2) 注册TNonblockingIOThread::notifyHandler(),这个是用来监听管道的。

    TNonblockingIOThreadWorker两类线程间通过队列进行通讯,队列类型为std::queue<boost::shared_ptr<ThreadManager::Task> >

    class ThreadManager::Task: public Runnable

    {

    public:

        void run() 

        {

            // runnable_实际为TNonblockingServer::TConnection::Task

            runnable_->run();

        }

        

    private:

        // 这里的Runnable实际为TNonblockingServer::TConnection::Task

        // 在TNonblockingServer::TConnection::transition()中被push进来

        boost::shared_ptr<Runnable> runnable_;

    };

    2. TNonblockingServer::TConnection::transition()

    transition()为状态切换函数,状态有两种:一是socket的状态,另一是rpc会话的状态。APP开头的是rpc会话的状态,SOCKET开头的是socket的状态。

     

    APP_READ_REQUEST状态发生在IO线程中,addTask()会将任务转交给或工作线程,然后由工作线程回调服务端的函数。

    TNonblockingServer::TConnection::Task

    {

    public:

        void run()

        {        

            // 回调

            processor_->process(input_, output_, connectionContext_);

            // 回调完后通知,

            // 从工作线程重回到IO线程

            connection_->notifyIOThread(); // ioThread_->notify(this);

            // 这个将触发TNonblockingIOThread::notifyHandler()

        }

    };

    TNonblockingIOThread::notifyHandler()

    {

        // 从管道中取出connection的指针地址

        TNonblockingServer::TConnection* connection = NULL;

        int nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);

        connection->transition(); // 进入状态转换函数

    }

    3. RPC函数被调用过程

    IO线程收到完整的RPC请求包时,以任务方式转给工作线程,然后由工作线程回调用户写的RPC函数。

     

    完成的调用过程如下图所示:

     

    任务从IO线程进入工作线程:

     

    4. 管道和任务队列

    IO线程以Task方式将连接交给工作线程,而工作线程在回调完后,以管道方式还回给IO线程。连接从IO线程进入到或工作线程后,会从libevent中删除,返回后再进入libevent

     

    5. 对象间关系

    class TNonblockingServer: public TServer

    {

    public:

        void serve() // 用户可以直接调用server(),但直接调用run()是更好的做法

        {

            // 创建socket监听

            // 创建TNonblockingIOThread

            // 通过Thread启动TNonblockingIOThread

        }

    };

    class TServer: public concurrency::Runnable

    {

    public:

        virtual void serve() = 0;

        virtual void run() // 用户也可以直接调用run()

        {

            serve();

        }

    };

     

    6. 相关代码摘要

    // 线程

    // thrift支持原生posix线程和boost线程

    void PthreadThread::start()

    {

        // PthreadThread是一个Posix线程类

        pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef);

    }

    static void* PthreadThread::threadMain(void* arg)

    {

        thread->runnable()->run();

    }

    // 以下为IO线程

    /// Three states for sockets: recv frame size, recv data, and send mode

    enum TSocketState

    {

      SOCKET_RECV_FRAMING,

      SOCKET_RECV,

      SOCKET_SEND

    };

    /**

     * Five states for the nonblocking server:

     *  1) initialize

     *  2) read 4 byte frame size

     *  3) read frame of data

     *  4) send back data (if any)

     *  5) force immediate connection close

     */

    enum TAppState

    {

      APP_INIT,            // 初始化

      APP_READ_FRAME_SIZE, // 接收包大小

      APP_READ_REQUEST,    // 接收包数据

      APP_WAIT_TASK,       

      APP_SEND_RESULT,     // 发送数据

      APP_CLOSE_CONNECTION // 关闭连接

    };

    // 启动监听和IO线程

    void TNonblockingServer::serve()

    {

        createAndListenOnSocket();

        for (uint32_t id = 0; id < numIOThreads_; ++id)

        {

            // TNonblockingIOThread是一个Runnable

            // 以委托方式被运行在PthreadThread中

            thread = new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_);

            ioThreads_.push_back(thread);

        }

        for (uint32_t i = 1; i < ioThreads_.size(); ++i)

        {

            // PthreadThread thread;

            thread->start();

        }

        

        ioThreads_[0]->run(); // 这将阻塞调用线程

        for (uint32_t i = 0; i < ioThreads_.size(); ++i)

        {

            ioThreads_[i]->join();

        }

    }

    void TNonblockingIOThread::run()

    {

        eventBase_ = event_base_new();

        

        // IO线程在启动时会调用registerEvents()

        // 在registerEvents()中完成两个回调函数的注册:listenHandler和notifyHandler

        // listenHandler回调负责接受请求,并创建连接对象

        registerEvents();

        event_base_loop(eventBase_, 0); // libevent

    }

    void TNonblockingIOThread::registerEvents()

    {

        // listenHandler和socket关联

        event_set(&serverEvent_, listenSocket_, EV_READ | EV_PERSIST,

            TNonblockingIOThread::listenHandler, server_);

        // notifyHandler和pipe关联

        event_set(¬ificationEvent_, getNotificationRecvFD(), EV_READ | EV_PERSIST,

            TNonblockingIOThread::notifyHandler, this);

    }

    static void listenHandler(evutil_socket_t fd, short which, void* v)

    {

        ((TNonblockingServer*)v)->handleEvent(fd, which);

    }

    void TNonblockingServer::handleEvent(int fd, short which)

    {

        accept();

        createConnection();

    }

    TNonblockingServer::TConnection* TNonblockingServer::createConnection()

    {

        // 会将自己绑定到一个线程

        // 采用轮询的方式选择线程

        // int selectedThreadIdx = nextIOThread_;

        // nextIOThread_ = (nextIOThread_ + 1) % ioThreads_.size();

        // std::stack<TConnection*> connectionStack_;

        // 使用了内存池connectionStack_    

        // App状态:APP_INIT

        // Socket状态:SOCKET_RECV_FRAMING

    }

    static void eventHandler(evutil_socket_t fd, short /* which */, void* v)

    {

        assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());

        ((TConnection*)v)->workSocket();

    }

    void TNonblockingServer::TConnection::setFlags(short eventFlags)

    {

        event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this);

    }

    void TNonblockingServer::TConnection::setRead()

    {

        setFlags(EV_READ | EV_PERSIST);

    }

    void TNonblockingServer::TConnection::setWrite()

    {

        setFlags(EV_WRITE | EV_PERSIST);

    }

    void TNonblockingServer::TConnection::setIdle()

    {

        setFlags(0);

    }

    void TNonblockingServer::TConnection::workSocket()

    {

        case SOCKET_RECV_FRAMING:

            TSocket::read(); // 接收包大小

            transition();

        case SOCKET_RECV:

            TSocket::read(); // 接收包数据

            transition();

        case SOCKET_SEND:

            TSocket::write_partial(); // 发送数据(非阻塞的)

            transition();

    }

    void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v)

    {

        recv();

        connection->transition();

    }

    // transition()为状态迁移函数

    void TNonblockingServer::TConnection::transition()

    {

        case APP_INIT:

            setRead();

        case APP_WAIT_TASK:

            setWrite();

        case APP_READ_REQUEST:

            setIdle();

    }

    TNonblockingServer::TConnection::Task

    {

    public:

        void run()

        {        

            // 回调

            processor_->process(input_, output_, connectionContext_);

            // 回调完后通知,

            // 从工作线程重回到IO线程

            // connection_的指针地址将通过管道传给工作线程

            connection_->notifyIOThread(); // ioThread_->notify(this);

        }

    };

    TNonblockingIOThread::notifyHandler()

    {

        // 从管道中取出connection的指针地址

        TNonblockingServer::TConnection* connection = NULL;

        int nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);

        connection->transition(); // 进入状态转换函数

    }

    // 以下为工作线程

    class ThreadManager::Impl : public ThreadManager;

    class SimpleThreadManager : public ThreadManager::Impl;

    class ThreadManager::Worker: public Runnable;

    class ThreadManager::Task : public Runnable;

    void SimpleThreadManager::start()

    {

        // workerCount_为工作线程数

        addWorker(workerCount_);

    }

    void ThreadManager::Impl::addWorker(size_t value)

    {

        for (size_t ix = 0; ix < value; ix++)

        {

            worker = new ThreadManager::Worker(this);

            

            // thread为PthreadThread

            // 调用了worker->run();

            thread->start();

        }

    }

    void ThreadManager::Worker::run() 

    {

        ThreadManager::Task task;

        task->run();

    }

    class ThreadManager::Task: public Runnable

    {

    public:

        void run() 

        {

            // runnable_实际为TNonblockingServer::TConnection::Task

            runnable_->run();

        }

        

    private:

        // 这里的Runnable实际为TNonblockingServer::TConnection::Task

        // 在TNonblockingServer::TConnection::transition()中被push进来

        boost::shared_ptr<Runnable> runnable_;

    };

    void ThreadManager::Impl::add(shared_ptr<Runnable> value)

    {

        // std::queue<shared_ptr<Task> > tasks_;

        task = new ThreadManager::Task(value, expiration);

        tasks_.push(task);

    }

    // 两者关系

    class TNonblockingServer: public TServer

    {

    public:

        TNonblockingServer(const boost::shared_ptr<ThreadManager>& threadManager);

    private:

        // TNonblockingServer关联了ThreadManager

        boost::shared_ptr<ThreadManager> threadManager_;

    };

    // 工作线程将回调TNonblockingServer::TConnection::Task

    class TNonblockingServer::TConnection::Task: public Runnable

    {

    };

    // task为TNonblockingServer::TConnection::Task

    void TNonblockingServer::addTask(boost::shared_ptr<Runnable> task)

    {

        // 将任务交给工作线程

        // threadManager_为SimpleThreadManager

        threadManager_->add(task, 0LL, taskExpireTime_);

    }

    void TNonblockingServer::TConnection::transition()

    {

        case APP_READ_REQUEST:

            if (server_->isThreadPoolProcessing())

            {

                boost::shared_ptr<Runnable> task =

                    new TNonblockingServer::TConnection::Task(

                        processor_, inputProtocol_, outputProtocol_, this);

                // server_为TNonblockingServer

                // 回调交给工作线程,IO线程不做这个工作

                server_->addTask(task); // server_为TNonblockingServer

            }

            else

            {

                // 调用TNonblockingServer的构造函数时,

                // 如果没有指定参数ThreadManager,则会走这条分支

                // 这种情况下,isThreadPoolProcessing()返回false

                processor_->process(inputProtocol_, outputProtocol_, connectionContext_);

            }

    }

    void TNonblockingServer::TConnection::Task::run()

    {

        // 回调

        processor_->process(input_, output_, connectionContext_);

    }

    内嵌关系:

    1) TNonblockingServer内嵌了类TConnection,而TConnection又内嵌了类Task

    2) ThreadManager内嵌了类Impl、类Worker和类Task(注意区分于TConnection内嵌的Task),而Impl又是ThreadManager的子类,而Task是对Runnable的实现

    class TNonblockingServer: public TServer

    {

    public:

        void serve() // 用户可以直接调用server(),但直接调用run()是更好的做法

        {

            // 创建socket监听

            // 创建TNonblockingIOThread

            // 通过Thread启动TNonblockingIOThread

        }

    };

    class TServer: public concurrency::Runnable

    {

    public:

        virtual void serve() = 0;

        virtual void run() // 用户也可以直接调用run()

        {

            serve();

        }

    };

  • 相关阅读:
    [原]【实例化需求】1.FitNesse工具应用简介
    [原][问题解决]常见问题的5种解决办法(由Jenkins问题解决谈起)
    [原][自动化测试]Robot Framework Selenium基本使用
    [原]好玩的Linux,关于时间cal命令
    [原][问题解决]Romote Control 图形化界面显示问题解决办法
    【SBE】由需求管理谈起
    [Robot]关于robot的几个学习网站
    coolite中在UserControl中使用Coolite.AjaxMethods方法
    javascript实现页面加载进度条(转)
    coolite 在前台更新Window中控件的值
  • 原文地址:https://www.cnblogs.com/aquester/p/9891578.html
Copyright © 2011-2022 走看看