zoukankan      html  css  js  c++  java
  • Pulsar 学习记录

    一、简介

    Apache Pulsar is a cloud-native, distributed messaging and streaming platform originally created at Yahoo! and now a top-level Apache Software Foundation project。

    Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体。

    采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。

    在消息领域,Pulsar 是第一个将存储计算分离云原生架构落地的开源项目。

    该系统源于 Yahoo,最初在 Yahoo 内部开发和部署,支持 Yahoo 应用服务平台 140 万个主题,日处理超过 1000 亿条消息。

    Pulsar 于 2016 年由 Yahoo 开源并捐赠给 Apache 软件基金会进行孵化,2018 年成为 Apache 软件基金会顶级项目。

    Apache Pulsar 提供了统一的消费模型,支持 Stream(如 Kafka)和 Queue(如 RabbitMQ)两种消费模型, 支持 exclusive、failover 和 shared 三种消费模式。

    同时,Pulsar 提供和 Kafka 兼容的 API,以及 Kafka-On-Pulsar(KoP) 组件来兼容 Kafka 的应用程序,

    KoP 在 Pulsar Broker 中解析 Kafka 协议,用户不用改动客户端的任何 Kafka 代码就能直接使用 Pulsar。

    目前,Apache Pulsar 已经应用部署在国内外众多大型互联网公司和传统行业公司,

    案例分布在人工智能、金融、电信运营商、直播与短视频、物联网、零售与电子商务、在线教育等多个行业,

    如美国有线电视网络巨头Comcast、Yahoo!、腾讯、中国电信、中国移动、BIGO、VIPKID 等。

    目前 Apache Pulsar 项目原生核心贡献者已组成创业公司 StreamNative,进一步为 Apache Pulsar 提供更好的企业级服务支持与生态建设。

    二、核心概念

    instance:实例

    cluster:集群

    tenant:租户

    namespace:命名空间

    topic:主题

    partitioned topic:可分区主题

    broker:Pulsar集群计算节点

    bookie:BookKeeper存储节点

    pub-sub:发布-订阅模型

    producer:生产者

    subscription:订阅(相当于 consumer group),一个Topic可以添加多个订阅。订阅不包含消息的数据,只包含元数据和游标。

    consumer:消费者

    Pulsar链接:

    Pulsar 2.0版本之前:{persistent|non-persistent}://property/cluster/namespace/topic
    Pulsar 2.0版本之后:{persistent|non-persistent}://tenant/namespace/topic

    与kafka中概念对应关系:

    三、核心组件

    broker:计算层,整个消息层的生产和消费,无存储状态。

    Apache BookKeeper:存储层,数据持久化保存的节点,有存储状态。

    RocksDB:内嵌在 BookKeeper 中的数据库,存储每个条目的位置索引。它只是将(LedgerId,EntryId) 映射到 (EntryLogId,文件中的偏移量)。

      <(LedgerID, EntryID), EntryLogID> 

      LedgerID 即 segmentID,EntryID 即是 Log Message 的逻辑 ID,EntryLogId 就是 Log消息在 Pulsar Fragment文件的物理 Offset。

    Apache ZooKeeper:元数据管理,存储 Pulsar 和 BookKeeper 元数据 和 节点状态,以及服务发现(发现 broker ,发现bookie)。在 Pulsar 里的作用是

    存储 Pulsar 系统里元数据的存储和集群的管理以及节点的发现等,节点发现是指发现集群里有多少个 broker,有多少 bookie。

    一个Pulsar实例由一个或多个Pulsar集群组成。实例中的集群可以在它们之间复制数据。集群间可以通过 跨地域复制(Geo-Replication) 进行消息同步。

    单个 Pulsar 集群由以下三部分组成:

    • 一个或者多个 broker 负责处理和负载均衡(loadmananger) producer 发出的消息,并将这些消息分派给 consumer;Broker 与 Pulsar 配置存储交互来处理相应的任务,并将消息存储在 BookKeeper 实例中(又称 bookies);Broker 依赖 ZooKeeper 集群处理特定的任务,等等。
    • 包含一个或多个 bookie 的 BookKeeper 集群负责消息的持久化存储。
    • 一个Zookeeper集群,用来处理多个Pulsar集群之间的协调任务。

    brokers 和 bookies交互示意图

     

    集群示意图

    1、Broker:负责负载均衡和消息的读取、写入等,负责与业务系统进行通信,承担协议转换,序列化和反序列化、选主等功能。

    Broker 主要由四个模块组成。

    • Dispatcher:调度分发模块,承担协议转换、序列化反序列化等。
    • Load balancer:负载均衡模块,对访问流量进行控制管理。
    • Global replicators:跨集群复制模块,承担异步的跨集群消息同步功能。
    • Service discovery:服务发现模块,为每个 topic 选择无状态的主节点。

    2、Bookie:存储设备

    3、ZooKeeper:存储元数据,集群配置,协调

    • local zk负责Pulsar Cluster内部的配置等
    • global zk则用于Pulsar Cluster之间的数据复制等。

      Configuration Store(老版本中称为 Global ZooKeeper),它存储的是集群复制信息,让集群之间互相了解各自的地址。同时还包括一些 clients 或 namespace 的相关配置信息。这样做的目的可以简化操作,只需一步即可将各个集群信息一同更新。

    三、架构

    计算与存储分离架构

    分层分片存储架构

    分层:

    • 计算层、存储层。
    • 层级化存储。

    分片:segment。

    • Pulsar 将 topic 分区划分为分片,然后将这些分片存储在 Apache BookKeeper 的存储节点上,以提高性能、可伸缩性和可用性。
    • 分片架构将消息流数据的存储粒度从分区拉低到了分片,以及相应的层级化存储。
    • Pulsar 的无限分布式日志以分片为中心,借助扩展日志存储(通过 Apache BookKeeper)实现,内置分层存储支持,因此分片可以均匀地分布在存储节点上。

    由于与任一给定 topic 相关的数据都不会与特定存储节点进行捆绑,因此很容易替换存储节点或缩扩容。另外,集群中最小或最慢的节点也不会成为存储或带宽的短板。

    • Pulsar 的架构无分区,也没有重平衡,保证了及时可伸缩性和高可用性。

    这两个重要特性使 Pulsar 尤其适用于构建与关键任务相关的服务,如金融用例的计费平台,电子商务和零售商的交易处理系统,金融机构的实时风险控制系统等。

    • 通过利用性能强大的 Netty 架构,数据从 producers 到 broker,再到 bookie 的转移都是零拷贝,都不会生成副本。

    这一特性对所有流用例都非常友好,因为数据直接通过网络或磁盘进行传输,没有任何性能损失。

    隔离架构

    保证了 Pulsar 的优良性能,主要体现在以下几个方面:

    • IO 隔离:写入、追尾读和追赶读隔离。
    • 利用网络流入带宽和磁盘顺序写入的特性实现高吞吐写:传统磁盘在顺序写入时,带宽很高,零散读写导致磁盘带宽降低,采取顺序写入方式可以提升性能。
    • 利用网络流出带宽和多个磁盘共同提供的 IOPS 处理能力实现高吞吐读:收到数据后,写到性能较好的 SSD 盘里,进行一级缓存,然后再使用异步线程,将数据写入到传统的 HDD 硬盘中,降低存储成本。
    • 利用各级缓存机制实现低延迟投递:生产者发送消息时,将消息写入 broker 缓存中;实时消费时(追尾读),首先从 broker 缓存中读取数据,避免从持久层 bookie 中读取,从而降低投递延迟。读取历史消息(追赶读)场景中,bookie 会将磁盘消息读入 bookie 读缓存中,从而避免每次都读取磁盘数据,降低读取延时。

    消息模型

     

    消息分发模型

    • 设置了 msg key,消息会基于 key 做 hash,将消息分发到不同的 partitions 中。
    • 未设置 msg key,消息会以 round robin 的形式,分发到不同的 partitions 中。

    订阅模型

    Exclusive:独占型,一个订阅只能有一个消息者消费消息。

    Failover:灾备型,一个订阅同时只有一个消费者,可以有多个备份消费者。一旦主消费者故障则备份消费者接管。不会出现同时有两个活跃的消费者。

    Shared:共享型,一个订阅中同时可以有多个消费者,多个消费者共享Topic中的消息。

    Key_Shared:键共享型。

    Ack机制

    单条 Ack(AckIndividual):是指 Consumer 可以根据消息的 messageID 来针对某一个特定的消息进行 Ack 操作。

    批量 Ack(AckCumulative):是指一次 Ack 多条消息。

    存储模型

    topic(partition):主题(分区),Pulsar 中的分区也是Topic。Pulsar中的概念。

    ledger:账簿。Ledger是一个只追加的数据结构,并且只有一个写入器,这个写入器负责多个BookKeeper存储节点(就是Bookies)的写入。 Ledger的条目会被复制到多个bookies。

                可以将 ledger 理解为 一个具体的 segment, (entry log 和 index 文件)

    entry(record):记录,存储一条消息,或者一批消息

    msg:消息

    batch msg:批量消息

    ========================================

    backlog:积压,用来表述 Topic 中没有被消费的数据。

    • Topic Backlog: 最慢的那个订阅的 Backlog 的集合
    • Subscription Backlog: 指针对单个订阅级别的没有消费的数据的集合
    • backlogSize:记录的是所有没有被 Ack 的消息的大小
    • msgBacklog: 记录的是所有未被 Ack 的 entries 的集合

    cursor:

    是用来存储一个订阅下 Consumer 的消费状态(位置)。等价于 offset(kafka)+ individualDeletes。

    每个Subscription都存储一个Cursor。Cursor是日志中的当前偏移量。Subscription将其Cursor存储至BookKeeper的Ledger中。这使Cursor跟踪可以像Topic一样进行扩展。

    LAC(Last Add Confirmed ID) :记下最后一条写成功的消息的 ID。

    LAP(Last-Add-Pushed):已经发送给 Bookie 但是尚未收到 Ack 的日志条目,其原理类似于 TCP 发送端的滑动窗口机制。

    strorage size:表述当前消息占用的存储空间的大小。

    分层存储(Tiered Storage)

      在一些流数据用例场景中,用户希望将数据长时间存储在流中。

    虽然 Apache Pulsar 对 topic backlog 的大小没有限制,但将所有数据存储在 Pulsar 中较长时间,存储成本比较大。

    Apache Pulsar 的分层存储特性(在 2.1 及之后的版本可用),分层存储支持在不影响终端用户的条件下,将较旧的数据移动到长期存储中。

      Pulsar 允许用户存储任意大小的 topic backlog。当集群将要耗尽空间时,用户只需添加新的存储节点,系统将会自动重新平衡数据。

    但是,这样的操作运行一段时间后,运维成本十分昂贵。

      Pulsar 通过提供分层存储(Apache Pulsar 2.1 起新增的特性)减少了成本/大小的损失。分层存储为用户提供大小不受限制的 backlog,且无需添加存储节点;

    卸载较旧的 topic 数据到长期存储中,长期存储的成本比在 Pulsar 集群中存储的成本低一个数量级。

    对于终端用户来说,消费存储在 Pulsar 集群或分层存储中的 topic 数据没有明显差别。位于 Pulsar 集群和分层存储中的 topic 生产和消费消息的方式也完全相同。

      Pulsar 通过分片架构实现了分层存储。Pulsar topic 的消息日志由一系列分片组成。序列中的最后一个分片是 Pulsar 当前写入的分片。

    当前序列之前的所有分片都已封装,也就是说,这些分片中的数据不可变。由于数据不可变,因此可以轻易地将数据复制到另一个存储系统,而不必担心一致性的问题。

    复制完成后,可以立即更新消息日志元数据中的数据指针,并且可以删除 Pulsar 在 Apache BookKeeper 中存储的数据副本。

    Pulsar 支持通过 Amazon S3、GCS(Google Cloud Storage)、Filesystem 进行长期存储

    主题分配 Topic  Assignment 

    namespace -> topic -> namespace hash ring -> bundle -> broker

    topic、bundle、broker 的映射关系存储在zookeeper中。

    由 broker 中的 loadmanager 进行分配 并 负载均衡

    注意:这里谈论的 Topic 是无分区的 Topic 或者 Partition Topic 中的一个 Partition。 

    主题查找 Topic Discovery

    producer | consumer -> topic | topic partition -> broker ->  topic | topic partition owner(broker)。

    注:因为任意一个broker 都可以通过 zookeeper 拿到 topic 和 broker 的对应关系。

    所以,broker 前端可以配置 proxy (loadbalance、dns、iplist)。

    消息读写流程

    首先进行 Topic Discovery,定位broker;

    然后,发送读取:producer | consumer -> topic partition owner(broker)。

    消息生命周期

    Retention 机制

    BookKeeper 消息保留机制。

    默认情况下,持久化的机制是关闭的。即消息被 Ack 之后,就会进入删除的逻辑。

    配置 Retention 策略时,有如下两个参数可以指定:

    • size:指持久化大小的阈值。0 代表不配置 Retention 大小策略,-1 代表设置的大小无限大
    • time:指持久化时间的阈值。0 代表不配置 Retention 时间策略,-1 代表时间无限大

    TTL机制

    没有消费者消费订阅的消息的处理。

    跨机房复制 (geo replication)(full mesh replication)

    • 双向复制
    • broker 区域去重机制
    • 异步复制

    Bookie异常恢复

    AutoRecovery 进程,自动处理,将挂掉的Bookie中的数据重新存储到其他 Bookie 中。

    AutoRecovery 任务是由若干个 worker 线程构成的线程池执行的,每个 worker 线程从由自己负责的 zookeeper path 上找到要恢复数据的 Ledger 进行数据复制。

    如果集群发生扩容,则由 Auditor 线程负责 Segment 数据的迁移复制。

    Pulsar 负载均衡的几个问题


    1、broker 是否是对等的?
    【答】是对等的。

    2、每一个 broker 实例中包含 loadbalance 模块?
    【答】是
    那做负载均衡的时候由哪个 broker 的 loadbalance 来做呢?
    【答】每一个broker都可能来发起这个操作。
    在运行过程中,当某一个broker发现负载达到阈值时,此时就由该broker发起 loadbalance 。

    3、负载均衡后,重新给topic 分配 了 ownership (broker),保存在 zk 中?
    下次如何 根据hash环来定位到 这个新的 ownership 呢?

    topic discovery 由 service discovery模块来做吗?
    【答】由 broker 内部的特定逻辑来处理。


    4、负载均衡
    broker 负载均衡 :由 loadbalance 来做。
    bookie 负载均衡 :由 bookie 自身的负载均衡机制来做。【条带化写】

    5、broker如何选定bookie?

    【答】随机,因为 bookie 是条带化写入,大部分时间是出于比较均衡的状态,所以可以随机给定。

    疑问

    1、资源池化

    云计算和Web数据中心业务要求计算中心的CPU,内存,存储和网络资源能作为一个全局的资源池进行动态,灵活的调配和绑定。

    服务器设备资源池化是对硬件的重构,而服务器的硬件重构关乎大规模数据中心的效率和成本问题,

    目标是降低服务器硬件购置成本、提高服务器硬件资源的利用率、降低服务器运维成本,降低综合TCO。

    服务器资源池化的几个资源为:CPU池、内存池、存储池、I/O池

    2、消息消费

    Pulsar 的消费模型采用了流拉取的方式。流拉取是长轮询的改进版,不仅实现了单个调用和请求之间的零等待,还可以提供双向消息流。

    通过流拉取模型,Pulsar 实现了比所有现有长轮询消息系统(如 Kafka)都低的端到端延迟。

    3、自动负载均衡功能

    Pulsar 的自动负载均衡功能可以自动并立即使用集群中新加的计算和存储能力。

    这使得 broker 之间可以迁移 topic 来平衡负载,新 bookie 节点可以立即接受新数据分片的写入流量,而无需手动重新平衡或管理 broker。

    4、消息路由

    通过 Pulsar IO、Pulsar Functions、Pulsar Protocol Handler,Pulsar 具有全面路由的功能。Pulsar 的路由功能包括基于内容的路由、消息转换,和消息扩充。

    5、BookKeeper API

    【低阶】Ledger API: 底层API,可以直接操作ledger,比较灵活。

    【高阶】Stream API: higher-level面向流的API,通过Apache DistributedLog提供,直接操作流,不用关心与ledger交互的复杂性。

    7、消息删除

    在 BookKeeper 中,允许操作的最小的单元是一个 segment,所以在具体的 msg(entry)级别,是没办法针对一条消息进行删除的,删除操作需要针对一个 segment 来进行操作。

    8、消息顺序

    逻辑全局有序(客户端保持顺序),物理存储无序。

    9、broker是对等的吗

    broker是对等的,负载均衡由 loadmanager 执行,进行topic卸载操作,以便平衡broker流量负载。 

    相关

    AMQP:高级消息队列协议

    AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

    基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

    MQTT:轻量级物联网消息推送协议

    MQTT是机器对机器(M2M)/物联网(IoT)连接协议。它被设计为一个极其轻量级的发布/订阅消息传输协议。

    对于需要较小代码占用空间和/或网络带宽非常宝贵的远程连接非常有用,是专为受限设备和低带宽、高延迟或不可靠的网络而设计。

    这些原则也使该协议成为新兴的“机器到机器”(M2M)或物联网(IoT)世界的连接设备,以及带宽和电池功率非常高的移动应用的理想选择。

    例如,它已被用于通过卫星链路与代理通信的传感器、与医疗服务提供者的拨号连接,以及一系列家庭自动化和小型设备场景。

    它也是移动应用的理想选择,因为它体积小,功耗低,数据包最小,并且可以有效地将信息分配给一个或多个接收器。

    MQTT on Pulsar (MoP)

    MQTT-on-Pulsar (aka MoP) is developed to support MQTT protocol natively on Apache Pulsar.

    KoP(Kakfa on Pulsar)

    AoP(AMQP on Pulsar)

    ANSI标准

    ANSI:美国国家标准学会(AMERICAN NATIONAL STANDARDS INSTITUTE: ANSI)成立于1918年。

    什么是条带化(striping)

      当多个进程同时访问一个磁盘时,可能会出现磁盘冲突。大多数磁盘系统都对访问次数(每秒的I/O操作,IOPS)和数据传输率(每秒传输的数据量,TPS)有限制。

    当达到这些限制时,后面需要访问磁盘的进程就需要等待,这时就是所谓的磁盘冲突。

      避免磁盘冲突是优化I/O 性能的一个重要目标,而 I/O 性能的优化与其他资源(如CPU和内存)的优化有着很大的区别 ,I/O 优化最有效的手段是将I/O 最大限度的进行平衡。

      条带化技术就是一种自动的将I/O 的负载均衡到多个物理磁盘上的技术,条带化技术就是将一块连续的数据分成很多小部分并把他们分别存储到不同磁盘上去。

    这就能使多个进程同时访问数据的多个不同部分而不会造成磁盘冲突,而且在需要对这种数据进行顺序访问的时候可以获得最大程度上的I/O 并行能力,从而获得非常好的性能。

    很多操作系统、磁盘设备供应商、各种第三方软件都能做到条带化。

      图1 描述的是一个未经条带化处理的连续数据的分布,图 2 描述的是一个已经被条带化处理的连续数据的分布,

    从中比较,我们可以发现图 2中对连续数据的读写都有最大的并发能力。

    图 1. 未经条带化处理的连续数据

    图 2. 已经被条带化处理的连续数据

    由于条带化在I/O 性能问题上的优越表现,以致于在应用系统所在的计算环境中的多个层次或平台都涉及到了条带化的技术,如操作系统和存储系统这两个层次中都可能使用条带化技术。

    其他:

    Kafka 采用单片架构模型,将服务与存储相结合。即 一个Kafka broker节点。

    参考资料

    官网

    github

    tgip-cn

    年货礼包 | 资料大全: Apache Pulsar 从入门到实践

    假期充电包 | Apache Pulsar 从入门到实践

    Pulsar负载均衡

    思否开源项目推介丨Apache Pulsar:下一代云原生分布式消息流平台

    Pulsar 与 Kafka 全方位对比(上篇):功能、性能、用例

    博文推荐|多图详解 Apache Pulsar 消息存储模型

    博文推荐 | 一文带你看懂 Pulsar 的消息保留和过期策略

    译文|简明指南:Apache Pulsar 的分层存储

    理解Apache Pulsar工作原理

    Apache Pulsar指北

    Pulsar笔记(好)

    Pulsar学习笔记之 编译Jar包、构建镜像、部署集群

    Apache Pulsar 源码走读(一)启动

    Apache Pulsar 源码走读(二)二进制协议

    Apache Pulsar 源码走读(三)TopicLookup 请求处理(一)

    Apache Pulsar 源码走读(四)TopicLookup请求处理(二)

    理解磁盘条带化

  • 相关阅读:
    Element Form表单验证
    layui table中记住当前页
    Mysql定时任务
    Mysql存储过程
    StringRedisTemplate与redistemplate
    vue路由传值
    背景色渐变(兼容各浏览器)
    用onclick点击框架跳转
    美化滚动条
    图片无缝滚动
  • 原文地址:https://www.cnblogs.com/wangwangfei/p/14649166.html
Copyright © 2011-2022 走看看