zoukankan      html  css  js  c++  java
  • Theron, a lightweight C++ concurrency library, 源码分析(二)

    Framework class

    Framework类在Theron应该可以看作是一个中枢神经系统。他管理着它内部各个actor之间的交互。

    我们知道,Windows是基于消息的系统。因此,它定义了消息队列,以及处理消息的基本流程:

       1: while (GetMessage(&msg, NULL, 0, 0))
       2: {
       3:     if (!TranslateAccelerator(msg.hwnd, hAccelTable, &msg))
       4:     {
       5:         TranslateMessage(&msg);
       6:         DispatchMessage(&msg);
       7:     }
       8: }

    Actor Model同样以消息为基础,那么也应该存在一个消息队列和消息派发的过程。

    这个过程就是在Framework中定义的。我们来看Framework::Initialize()(略去了部分非关键代码):

       1: void Framework::Initialize()
       2: {
       3:     mRunning = true;
       4:     mManagerThread.Start(ManagerThreadEntryPoint, this);
       5:  
       6:     uint32_t backoff(0);
       7:     while (mThreadCount.Load() < mTargetThreadCount.Load())
       8:     {
       9:         Detail::Utils::Backoff(backoff);
      10:     }
      11:  
      12:     mIndex = Detail::StaticDirectory<Framework>::Register(this);
      13:  
      14:     if (mName.IsNull())
      15:     {
      16:         mName = Detail::NameGenerator::Generate(mIndex);
      17:     }
      18: }

    有3个亮点:

    1. mRunning = true和ManagerThreadEntryPoint
    2. Utils::Backoff
    3. mIndex和mName

    mManagerThread是一个Thread类型,那么我们只有关注ManagerThreadEntryPoint就可以了。其实ManagerThreadEntryPoint很简单,不过它引入了另外一个函数,ManagerThreadProc

    Backoff是一个和Sleep(0)类似的东西,不过和Sleep有一点细微的差别。Backoff调用了YieldProcessor。大家可以自己google下YieldProcessor vs Sleep(0)。

    mIndex是给Framework分配了一个全局的索引值。

    Framework::ManagerThreadProc

    这个函数里,我们首先可以看到一个while循环:

    while (mRunning) { … }

    可以很明确地肯定,在Framework还没有销毁时,它就是个死循环。这基本上就对等了Windows里的那个while。

    while里面的循环稍微有点复杂了。我们需要分块一点一点来剥离。

       1: while (mThreadCount.Load() < mTargetThreadCount.Load())
       2: {
       3:     WorkerThreadStore* store = new WorkerThreadStore(…);
       4:     ThreadContext* threadContext = new ThreadContext(store);
       5:  
       6:     ThreadPool::CreateThread(threadContext);
       7:     ThreadPool::StartThread(threadContext, &mWorkQueue, mNodeMask, mProcessorMask);
       8:  
       9:     mThreadContexts.Insert(threadContext);
      10: }

    首先,这个函数是在Initialize里被调用的,因此必定要做一些初始化的操作。初始化操作的一个重头戏就是创建出足够多的线程。

    在前面已经说了,线程被启动后,就会一直运行。因此初始化操作会创建16个(默认mTargetThreadCount是16)不停运行的线程。这些线程都是从mWorkQueue(SafeThreadQueue<Mailbox>类型)中去取相应的Mailbox,然后把关联的消息地送给actor执行。

    所以,消息队列就是mWorkQueue。

    当然这个函数虽然是在Initialize里被调用起来的,但是作为消息泵,它还有其他的职责。从函数的实现细节来看,整个过程可以用下面几条来总结:

    • mManagerThread还在运行时:
      • 如果有空闲线程,那么启用空闲线程来执行任务
      • 若可用线程数还没有达到上限,创建出足够多的线程
      • 若线程数超过上限(线程数的上限可以动态调整),停止一些线程
    • mManagerThread停止运行时:
      • 如果mThreadContext非空,则销毁ThreadContext管理的线程对象和掌握的资源

    到这里,基本上把整个消息机制讲解完毕了。接下来我们要说一说Framework和Actor之间的关系。

     

    Framework and Actor

    Theron里的Framework有点像设计模式中的中介者模式。它管理着Actor的相关信息。怎么说呢,事实上和Actor对象关联的Mailbox和Address信息都是从Framework这里生产的。Framework::RegisterActor完成了这个工作。

       1: void Framework::RegisterActor(Actor* actor, const char* name)
       2: {
       3:     const uint32_t mailboxIndex(mMailboxes.Allocate());
       4:     Mailbox& mailbox(mMailboxes.GetEntry(mailboxIndex));
       5:  
       6:     String mailboxName(name);
       7:     mailbox.RegisterActor(actor);
       8:  
       9:     const Index index(mIndex, mailboxIndex);
      10:     const Address mailboxAddress(mailboxName, index);
      11:  
      12:     actor->mAddress = mailboxAddress;
      13: }

    所以,Framework像邮局,他知道他管辖的区域中有那些邮箱地址,而且也只有它才知道。不同的邮局管辖不同的区域,因此跨域通信必然是两个Framework之间协同完成的。

    在上面这段代码里有一个mIndex变量,这个是Framework的成员变量,用来在全局表征一个Framework的索引值。这个指会在后面在提到。

    到这里,我们基本可以把Framework和Actor之间的关系理个清楚了。接下来要说的就是actor和actor之间是如何发送消息的。

    为了能够把问题简单化,我们这里只讨论在同一个Framework中的不同actor之间通信的情况。这个过程要涉及2个要点:

    • Framework::Send函数
    • ProcessorContext结构

     

    Framework::Send

    Send函数的原型是:Framework::Send(const ValueType &value, const Address &from, const Address &address),是一个模板函数。这个函数的作用就是把消息投递到对应地址的mailbox中。

    Send函数串联了不同actor之间的交互。整个消息的发送过程其实是由MessageSender::Send(EndPoint* endPoint, ProcessorContext* processorContext, const uint32_t localFrameworkIndex, IMessage* message, const Address &address)来负责完成的。

    这个函数有很多参数,我们已经假定了只考虑同一个Framework中不同actors之间的通信。在这种情况下,我们可以不用考虑endPoint。另外,我们看到只有一个Address类型的参数了,要记得,发送者的address在message里。所以,这里的address是接收消息的actor的address。因此,Framework::Send中的value还不能算消息,只能算消息内容。因为前面说了,消息是要包含发送者的地址的。所以,在Send函数里,我们会看到它调用了MessageCreator::Create来创建一个消息结构,将value和发送消息的actor地址绑定在了一起。

    接下来要的是processorContext和localFrameworkIndex两个变量。

    ProcessorContext and Index

    ProcessorContext事实上就是保存了某个Framework的相关信息。在发送消息的函数里(MessageSender::Send)附带上这个参数,可以明确当前的消息是从哪个Framework中发出来的。

    localFrameworkIndex是Index类型的。在MessageSender::Send中就是指接收消息的Framework的Index。

    从RegisterActor函数里我们也可以看到,每一个Address对象都会关联一个Index,而这个Index又是和Framework::mIndex有一定的关联关系的。因此在MessageSender::Send中,我们可以通过address和localFrameworkIndex来判断当前的消息是在同一Framework下的actor之间相互沟通还是在不同Framework之间来回。

       1: bool MessageSender::DeliverByIndex(ProcessorContext* processorContext,
       2:                                    const uint32_t localFrameworkIndex,
       3:                                    IMessage* message,
       4:                                    const Address &address)
       5: {    
       6:     const Index index(address.mIndex);
       7:  
       8:     // Which framework is the addressed entity in?
       9:     const uint32_t targetFrameworkIndex(index.mComponents.mFramework);
      10:     if (targetFrameworkIndex == localFrameworkIndex)
      11:     {
      12:         // Is the message addressed to an actor in this framework?
      13:         delivered = DeliverToActorInThisFramework(processorContext,
      14:                                                   message,
      15:                                                   address);
      16:     }
      17:     else
      18:     {
      19:         delivered = DeliverToLocalMailbox(message, address);
      20:     }
      21: }

    DeliverToActorInThisFramework就不细说了,可以自己看。大致的逻辑就是从address中找出关联的mailbox对象,然后将message push到该mailbox中。如果当前的mailbox在填充消息前是空的,那么这个mailbox肯定不在该ProcessorContext的workQueue中。因此在放入消息后,要将它加入的workQueue中,让线程池在处理workQueue时,能处理到这条消息。

    Actor class

    Actor类其实是很简单的。在分析代码前,我们先把之前提到的和actor相关的概念再深化下。

    一个actor在响应消息的同时可以:

    • 发送有限量的消息给其他actor
    • 创建有线量的其他actor
    • 指定下一次收到消息时具体响应动作/行为

    也就是说在actor的消息处理函数里,actor可以给其他actor发送消息,可以创建新的actor,可以设置当前actor的一些属性。

    所以,Actor类必定要有一个发送消息的函数:Actor::Send。在开始分析Send函数之前,我们先看一眼Actor类的成员函数。

    • Address mAddress;
    • Framework* mFramework;
    • HandlerCollection mMessageHandlers;
    • DefaultHandlerCollection mDefaultHandlers;
    • ProcessorContext* mProcessorContext;

    Address和Framework暂时没啥好说的。mMessageHandlers就是当前Actor注册了的消息处理函数集合。

    那继续我们的成员函数Send。

    Actor::Send(const ValueType& value, const Address& address) const

    Send函数就是把消息(value)发送到指定的address那里去。和Windows里的PostMessage一样,是异步的。

    事实上,这个函数的内部实现也是调用MessageSender::Send来完成的。这种情况下,再回头来看ProcessorContext和localFrameworkIndex你应该会有更多的体会。对这个函数的细致分析,就不多补充了。

    基本到这里,该说的都说完了。

  • 相关阅读:
    吸取前人教训,每个程序员都曾犯过的经典错误,你可千万要避免
    程序员帮助妹妹攻破C语言,整理超详细规划,快来看!
    2020年总结:程序员爱用开发工具 Top 100
    程序员为了拿到N+1辞退金,使计让公司辞退自己,网友评论画风突变!
    回顾2020的爷青结:有哪些记忆里的应用正在消逝!
    菜鸟看过来!新的一年,这几点雷区别再踩了
    年龄从不是学习的界限,84岁高龄奶奶自学编程,受苹果公司CEO邀请参加全球开发者大会!
    TIOBE 2月编程语言排行榜出炉:C语言冠军宝座稳定,前十名有什么变化?
    用TensorRT和Numpy编写的Yolov5推理。在【Nvidia Jetson】设备上运行,无需依赖任何pytorch / torchvision
    Example app/api for exposing yolov5 model via flask
  • 原文地址:https://www.cnblogs.com/wpcockroach/p/2800507.html
Copyright © 2011-2022 走看看