zoukankan      html  css  js  c++  java
  • nsq源码阅读笔记之nsqd(二)——Topic

    Topic相关的代码主要位于nsqd/nsqd.gonsqd/topic.go中。

    Topic的获取

    Topic通过GetTopic函数获取

    GetTopic函数用于获取topic对象,首先先尝试从topicMap表中获取,如果指定的topic存在,则直接返回topic对象。

    当topic不存在时需要新建一个topic,加入到topicMap中, 
    如果启用了nsqlookupd则需要从lookupd中获取该topic的所有channel,在去除#ephemeral结尾的临时channel后加入到topic中。

    其中锁的使用值得学习:在调用完nsqd的变量后转而进行topic操作,这时候程序转而使用topic的小粒度的锁,释放了nsqd全局的大粒度锁, 
    在保证线程安全的同时减少了效率上的损失。

    在创建新的topic后需要向channelUpdateChan发送消息来更新topic中的channel,而channelUpdateChan是一个阻塞的go channel, 
    所以此处使用了select,并同时监听了exitChan。如果此时exitChan收到信号则可以正常退出select。 
    如果没有case <-t.exitChan这句话, 
    则可能接收channelUpdateChan的go channel已经退出,但是发送端却还在阻塞中。当然,可以通过退出主go channel来结束程序, 
    但这样做可能造成部分析构的代码没有得到执行,而且在部分场景下, 
    只是程序的一个go channel结束运行(在nsqd的这个例子中是topic被删除)而非整个程序退出。 
    为了避免这个问题,nsqd许多向go channal发送消息的地方都使用了这种机制。

    以下是这种机制的一个示例, 
    可以通过The Go Playground来运行:

    运行程序,得到以下运行结果:

    Task1和Task2是两个生产者,它们都向workerChan发送消息,其中Task2立即发送,Task1有一定延时,workerChan是一个阻塞的go channel。 
    同时,有一个go channel发送结束信号(关闭exitChan)。随后开启消费者,接收workerChan的消息, 
    Task1和Task2的区别是Task2在select中多了一个对exitChan的监听。

    从结果可以看出,当exitChan被关闭时,Task2结束对workerChan的阻塞,取消了像worker发送信号,同时结束了自身。 
    而没有监听exitChan的Task1依然在阻塞,直到被读取后才退出。

    示例说明了可以通过对exitChan的使用来结束对阻塞go channel的等待。需要说明的是,在真实场景中, 
    消费者在发出结束的意图后可能并不会去处理尚未被处理的消息,所以像示例中的Task1是无法正常结束的。

    Topic的创建

    GetTopic未在已存在的topic中找到指定topic时,就会使用NewTopic函数新建一个Topic对象。 
    TopicNewTopic都位于nsqd/topic.go中。

    NewTopic函数首先创建了一个Topic结构,然后判断该topic是不是一个临时topic。topic中有个名为backend的变量, 
    其类型为Backend接口。对于临时topic, 
    消息只储存在内存中,因此backend变量使用newDummyBackendQueue函数初始化。该函数生成一个无任何功能的dummyBackendQueue结构; 
    对于永久的topic,backend使用newDiskQueue函数返回diskQueue类型赋值,并开启新的goroutine来进行数据的持久化。 
    dummyBackendQueuediskQueue都实现了Backend接口,因此,在之后可以使用backend统一处理。

    随后,NewTopic函数开启一个新的goroutine来执行messagePump函数,该函数负责消息循环,将进入topic中的消息投递到channel中。

    最后,NewTopic函数执行t.ctx.nsqd.Notify(t),该函数在topic和channel创建、停止的时候调用, 
    Notify函数通过执行PersistMetadata函数,将topic和channel的信息写到文件中。

    Notify函数的实现时,首先考虑了数据持久化的时机,如果当前nsqd尚在初始化,则不需要立即持久化数据,因为nsqd在初始化后会进行一次统一的持久化工作,

    Notify在进行数据持久化的时候采用了异步的方式。使得topic和channel能以同步的方式来调用Nofity而不阻塞。在异步运行的过程中, 
    通过waitGroup和监听exitChan的使用保证了结束程序时goroutine能正常退出。

    在执行持久化之前,case n.notifyChan <- v:语句向notifyChan传递消息,触发lookupLoop函数(nsqd/lookup.go中)接收notifyChan消息的部分, 
    从而实现向loopupd注册/取消注册响应的topic或channel。

    消息进入topic

    客户端通过nsqd的HTTP API或TCP API向特定topic发送消息,nsqd的HTTP或TCP模块通过调用对应topic的PutMessagePutMessages函数, 
    将消息投递到topic中。PutMessagePutMessages函数都通过topic的私有函数put进行消息的投递,两个函数的区别仅在PutMessage只调用一次put, 
    PutMessages遍历所有要投递的消息,对每条消息使用put函数进行投递。

    带缓冲的Go channel的特性

    put函数使用了一个带缓冲的go channel的特性:如果case里的go channel阻塞了,那么就会跳过该case语句,执行default分支。即,如果当前memoryMsgChan还有足够缓冲空间, 
    则消息被投入memoryMsgChan,如果当前memoryMsgChan的缓冲区已满,则将执行default分支,从而将消息保存到backend中。

    对于临时topic,由于backend不进行任何操作,这就意味着消息在内存的缓存满了之后会被直接丢弃,对于永久的channel,则backend会将该消息持久化到磁盘的文件中。

    put函数使用了Golang的channel特性,大大简化了实现这个逻辑的代码量,以下通过一个简单的示例看看Golang的带缓冲的channel的这一特性, 
    示例可以通过The Golang Playground运行:

    运行结果如下:

    示例程序中有3个go channel,workerChanworker2Chan用于处理消息,exitChan用于程序的退出。 
    当消费者go channel启动后,启动一个生产者go channel向workerChan连续发送3个消息, 
    time.After模拟了消费者在处理workerChan的消息时出现的延迟,而workerChan的缓冲区只有3, 
    因此当消费者向workerChan发送第4个消息的时候会被阻塞,从运行结果看,没有消息被投向worker2Chan, 
    程序在遇到阻塞时进入了default分支,打印出Channel queue full。特定场景下合理使用这一特性能够大幅减少程序的复杂度。

    put函数对消息的持久化

    以上部分来自函数的default分支,用于将消息持久化到磁盘文件中,过程如下:

    1. 通过bufferPoolGet函数从buffer池中获取一个buffer,bufferPoolGet及以下bufferPoolPut函数是对sync.Pool的简单包装。 
      两个函数位于nsqd/buffer_pool.go中。
    2. 调用writeMessageToBackend函数将消息写入磁盘。
    3. 通过bufferPoolPut函数将buffer归还buffer池。
    4. 调用SetHealth函数将writeMessageToBackend的返回值写入errValue变量。 
      该变量衍生出IsHealthyGetErrorGetHealth3个函数,主要用于测试以及从HTTP API获取nsqd的运行情况(是否发生错误)

    其中writeMessageToBackend函数重新初始化缓存,将Message类型的消息序列化到缓存中,最后将缓存写入backend

    Topic消息循环

    messagePump函数负责Topic的消息循环,该函数在创建新的topic时通过waitGroup在新的goroutine中运行。

    messagePump函数初始化时先获取当前存在的channel数组,设置memoryMsgChanbackendChan,随后进入消息循环, 
    在循环中主要处理四种消息:

    1. 接收来自memoryMsgChanbackendChan两个go channel进入的消息,并向当前的channal数组中的channel进行投递
    2. 处理当前topic下channel的更新
    3. 处理当前topic的暂停和恢复
    4. 监听当前topic的删除

    消息投递

    这两个case语句处理进入topic的消息,关于两个go channel的区别会在后续的博客中分析。 
    memoryMsgChanbackendChan读取到的消息是*Message类型,而从backendChan读取到的消息是byte数组的。 
    因此取出backendChan的消息后海需要调用decodeMessage函数对byte数组进行解码,返回*Message类型的消息。 
    二者都保存在msg变量中。

    随后是将消息投到每个channel中,首先先对消息进行复制操作,这里有个优化,对于第一次循环, 
    直接使用原消息进行发送以减少复制对象的开销,此后的循环将对消息进行复制。对于即时的消息, 
    直接调用channel的PutMessage函数进行投递,对于延迟的消息, 
    调用channel的StartDeferredTimeout函数进行投递。对于这两个函数的投递细节,后续博文中会详细分析。

    Topic下Channel的更新

    Channel的更新比较简单,从channelMap中取出每个channel,构成channel的数组以便后续进行消息的投递。 
    并且根据当前是否有channel以及该topic是否处于暂停状态来决定memoryMsgChanbackendChan是否为空。

    Topic的暂停和恢复

    这个case既处理topic的暂停也处理topic的恢复,pause变量决定其究竟是哪一种操作。 
    Topic的暂停和恢复其实和topic的更新很像,根据是否暂停以及是否有channel来决定是否分配memoryMsgChanbackendChan

    messagePump函数的退出

    messagePump通过监听exitChan来获知topic是否被删除,当topic的删除时,跳转到函数的最后,输出日志后退出消息循环。

    Topic的关闭和删除

    Topic关闭和删除的实现都是调用exit函数,只是传递的参数不同,删除时调用exit(true),关闭时调用exit(false)。 
    exit函数进入时通过atomic.CompareAndSwapInt32函数判断当前是否正在退出,如果不是,则设置退出标记,对于已经在退出的topic,不再重复执行退出函数。 
    接着对于关闭操作,使用Notify函数通知lookupd以便其他nsqd获知该消息。

    随后,exit函数调用close(t.exitChan)t.waitGroup.Wait()通知其他正在运行goroutine当前topic已经停止,并等待waitGroup中的goroutine结束运行。

    最后,对于删除和关闭两种操作,执行不同的逻辑来完成最后的清理工作:

    • 对于删除操作,需要清空channelMap并删除所有channel,然后删除内存和磁盘中所有未投递的消息。最后关闭backend管理的的磁盘文件。

    • 对于关闭操作,不清空channelMap,只是关闭所有的channel,使用flush函数将所有memoryMsgChan中未投递的消息用writeMessageToBackend保存到磁盘中。最后关闭backend管理的的磁盘文件。

    flush函数也使用到了default分支来检测是否已经处理完全部消息。 
    由于此时已经没有生产者向memoryMsgChan提供消息,因此如果出现阻塞就表示消息已经处理完毕。

    在删除topic时用到的Empty函数跟flush处理逻辑类似,只不过Empty只释放memoryMsgChan消息,而不保存它们。

    其他函数

    Depth函数

    Depth函数用于获取当前topic尚未投递的消息数,是memoryMsgChan缓冲区的长度加上backend里消息的个数。

    PauseUnPause函数

    对于很多相似的处理逻辑,nsqd在对外使用不同的函数,但在内部实现上通常把它们合并为一个函数来处理,只是传递的参数不同而已, 
    比如前面提到的CloseDeletePauseUnPause同样也使用这种方式,通过传递不同的参数调用doPause函数来执行不同操作。 
    doPause设置paused标志并向pauseChan发送消息,随后由messagePump在消息循环中暂停topic。

    AggregateChannelE2eProcessingLatency函数

    此函数用于性能统计,在nsqd/statd.go中调用,客户端可以通过HTTP的/stats API看到统计结果。具体细节将在后续博文分析。

    与channel相关的函数

    GetChannelgetOrCreateChannelGetExistingChannelDeleteExistingChannel这些函数是与channel相关的函数,将在后续的博文中分析。

  • 相关阅读:
    LeetCode 88. Merge Sorted Array
    LeetCode 75. Sort Colors
    LeetCode 581. Shortest Unsorted Continuous Subarray
    LeetCode 20. Valid Parentheses
    LeetCode 53. Maximum Subarray
    LeetCode 461. Hamming Distance
    LeetCode 448. Find All Numbers Disappeared in an Array
    LeetCode 976. Largest Perimeter Triangle
    LeetCode 1295. Find Numbers with Even Number of Digits
    如何自学并且系统学习计算机网络?(知乎问答)
  • 原文地址:https://www.cnblogs.com/zhangboyu/p/7457069.html
Copyright © 2011-2022 走看看