zoukankan      html  css  js  c++  java
  • 从新冠疫情出发,漫谈 Gossip 协议

    众所周知周知,疫情仍然在全球各地肆虐。据最新数据统计,截至北京时间 2020-05-28,全球累计确诊 5698703 例,累计死亡 352282 例,累计治愈 2415237 例。

    从上面的统计数据,我们可以看出,新冠病毒在人与人之间的传播是极其高效的,且影响范围广。如果我们把「新冠病毒」想象成一小段数据,将「人与人之间传播」想象成数据交换,那么,我们可以得出结论,在不考虑免疫系统和人为干预等一些因素,经过反复迭代,数据(新冠病毒)可以被发送(感染)到每个节点(人)上。

    这个就是今天要介绍的 Gossip 协议,该协议早在 1987 年就被发表在 ACM 上的论文《Epidemic Algorithms for Replicated Database Maintenance》中。当时主要用在分布式数据库系统中各个副本节点间同步数据。

    Gossip 协议简介

    Gossip 协议分为 Push-based 和 Pull-based 两种模式,具体工作流程如下:

    Push-based 的 Gossip 协议:

    • 网络中的某个节点随机选择N个节点作为数据接收对象

    • 该节点向其选中的N个节点传输相应数据

    • 接收到数据的节点对数据进行存储

    • 接收到数据的节点再从第一步开始周期性执行

    Pull-based 的 Gossip 协议,正好相反:

    • 集群内的所有节点,随机选择其它 k 个节点询问有没有新数据

    • 接收到请求的节点,返回新数据

    如何实现 Gossip

    这边简单分析下 HashiCorp 公司的 Serf 的核心库 Memberlist。这家公司研发了 Consul(基于 raft 实现的分布式存储)、Vagrant(声明式虚拟机编排)等优秀的产品。最近由于中美矛盾升级,也陷入到了舆论的漩涡中,爆出禁止在中国使用他们的产品的传闻。不过,这是题外话。

    Memberlist 这个 Golang 的代码库,基于 Gossip 协议,实现了集群内节点发现、 节点失效探测、节点故障转移、节点状态同步等。

    其核心实现的大致如下:

    • newMemberlist():初始化 Memberlist 对象,根据配置监听 TCP/UDP 端口,用于之后通信。这边需要注意一点,虽然是基于 Gossip 协议实现的,但是并不是所有信息都采用 Gossip 进行数据交换。比如节点加入集群的时候,为了尽快的让集群内所有节点感知到,采用遍历当前已知的所有节点并通过 TCP 连接发送并接收数据的方式,来确保跟所有节点完成数据交换。

    • gossip():Memberlist 对象启动之后,会定期使用 Gossip 协议,随机选择集群内的节点,采用 UDP 传输方式发送当前节点状态以及用户自定义的数据。

    • pushPull():还会定期随机选择一个节点,通过 TCP 传输方式与其做全量数据交换,加速集群内数据一致性收敛。

    • probe():还会定期轮训集群内的一个节点,通过 UDP 方式发送心跳探测包,做到节点感知。

    深入 Gossip 核心代码

    发送端处理流程:

    • 周期性地随机选择 m.config.GossipNodes 个节点,然后广播正在等待发送的信息

    // Create a gossip ticker if needed
    if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 {
    t := time.NewTicker(m.config.GossipInterval)
    go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip)
    m.tickers = append(m.tickers, t)
    }

    // gossip is invoked every GossipInterval period to broadcast our gossip
    // messages to a few random nodes.
    func (m *Memberlist) gossip() {
    defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now())

    // Get some random live, suspect, or recently dead nodes
    m.nodeLock.RLock()
    kNodes := kRandomNodes(m.config.GossipNodes, m.nodes, func(n *nodeState) bool {
        if n.Name == m.config.Name {
            return true
        }
        switch n.State {
        case StateAlive, StateSuspect:
            return false
        case StateDead:
            return time.Since(n.StateChange) > m.config.GossipToTheDeadTime
        default:
            return true
        }
    })
    m.nodeLock.RUnlock()
    
    // ...
    
    for _, node := range kNodes {
        // Get any pending broadcasts
        msgs := m.getBroadcasts(compoundOverhead, bytesAvail)
        if len(msgs) == 0 {
            return
        }
    
        addr := node.Address()
        if len(msgs) == 1 {
            // Send single message as is
            if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, msgs[0]); err != nil {
                m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
            }
        } else {
            // Otherwise create and send a compound message
            compound := makeCompoundMessage(msgs)
            if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, compound.Bytes()); err != nil {
                m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
            }
        }
    }
    

    }
    接收端:

    • 接收数据报文,然后解析报文信息,并将信息记录下来

    // packetListen is a long running goroutine that pulls packets out of the
    // transport and hands them off for processing.
    func (m *Memberlist) packetListen() {
    for {
    select {
    case packet := <-m.transport.PacketCh():
    m.ingestPacket(packet.Buf, packet.From, packet.Timestamp)

        case <-m.shutdownCh:
            return
        }
    }
    

    }

    func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time) {
    // ...

    // See if there's a checksum included to verify the contents of the message
    if len(buf) >= 5 && messageType(buf[0]) == hasCrcMsg {
        crc := crc32.ChecksumIEEE(buf[5:])
        expected := binary.BigEndian.Uint32(buf[1:5])
        if crc != expected {
            m.logger.Printf("[WARN] memberlist: Got invalid checksum for UDP packet: %x, %x", crc, expected)
            return
        }
        m.handleCommand(buf[5:], from, timestamp)
    } else {
        m.handleCommand(buf, from, timestamp)
    }
    

    }

    Gossip 协议的优缺点

    看了 Memberlist 的实现,难免会有这样的疑问,为什么要使用 Gossip 协议,直接在集群内广播不香么?接下来,我们可以通过 Gossip 协议的优缺点来分析,使用 Gossip 协议的意义。

    优点:

    • 协议简单,实现起来很方便

    • 扩展性强,可以允许集群内节点任意增加或者减少,新增节点最终会与其他节点一致

    • 去中心化,节点之间是完全对等的

    • 最终一致性

    缺点:

    • 数据同步延迟,因为只保证最终一致性,所以会出现某个时间点,部分节点数据不同步的情况

    • 传输数据冗余,相同数据在节点间会反复被传输

    今天对 Gossip 的协议就简单介绍到这里,如果有同学对内容感兴趣,可以回复评论,我们私下多多探讨和交流。

    参考资料

    https://en.wikipedia.org/wiki/Gossip_protocol

    https://github.com/hashicorp/serf

    https://github.com/hashicorp/memberlist

    https://zhuanlan.zhihu.com/p/41228196

    https://www.jianshu.com/p/de7b026f4997

    往期技术文章都在微信同步分享,可以添加微信:upyun0001.png

    推荐阅读

    容器化技术在数据中心的实践

    大型网课翻车现场!原因竟是……

  • 相关阅读:
    Python装饰器学习(九步入门)
    jQuery练习题
    JavaScript 练习题
    Apache的安装与下载
    非常好用的CSS样式重置表
    表单练习
    shell命令lsof
    IndentationError: unindent does not match any outer indentation level
    Zabbix监控mysql主从复制状态
    Zabbix监控php-fpm status
  • 原文地址:https://www.cnblogs.com/upyun/p/13153159.html
Copyright © 2011-2022 走看看