zoukankan      html  css  js  c++  java
  • Pulsar-Producer实现简介

    “Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API.”

    Pulsar是pub-sub模式的分布式消息平台,拥有灵活的消息模型和直观的客户端API。

    Pulsar由雅虎开发并开源的下一代消息系统,目前是Apache软件基金会的孵化器项目。

    本片文章简单介绍Pulsar的Producer,包含以下内容:

    • Producer的设计
    • 消息发送的实现

    1. Producer设计

    1.1 创建Producer

    以上是Pulsar官网上创建一个Producer的示例代码。

    创建的过程如下:

    1. 指定serviceUrl创建PulsarClient
    2. 指定Producer发送消息的Topic,通过PulsarClient创建Producer

    通过上述的创建代码可以推测:

    1. serviceUrl应该是用于做服务发现的,通过serviceUrl查找Broker的信息
    2. Producer指定了Topic,那么一个Producer只能往特定的Topic发送消息

    1.2 Producer API

    Pulsar中,发送相关的接口为Producer,如上图所示:

    • Producer定义了发送接口
    • ProducerBase作为抽象类,提供了基础实现
    • ProducerImpl则是真正的实现类
    • PartitionedProducerImpl看着和分区相关,这个之后再看

    Producer 接口具体如下:

    public interface Producer<T> extends Closeable {
        /**
         * 返回Producer发送消息的Topic
         */
        String getTopic();
        /**
         * Producer的名称
         */
        String getProducerName();
        /**
         * 同步发送消息
         */
        MessageId send(T message) throws PulsarClientException;
        /**
         * 有发送消息
         */
        CompletableFuture<MessageId> sendAsync(T message);
        /**
         * Flush客户端完成中的消息并等待所有消息成功持久化
         * @since 2.1.0
         * @see #flushAsync()
         */
        void flush() throws PulsarClientException;
        /**
         * 异步Flush客户端完成中的消息并等待所有消息成功持久化
         * @since 2.1.0
         * @see #flush()
         */
        CompletableFuture<Void> flushAsync();
        /**
         * 创建TypedMessageBuilder,用于构建消息
         */
        TypedMessageBuilder<T> newMessage();
        /**
         * 同步发送消息,已经被弃用
         */
        @Deprecated
        MessageId send(Message<T> message) throws PulsarClientException;
        /**
         * 异步发送消息,已经被弃用
         */
        @Deprecated
        CompletableFuture<MessageId> sendAsync(Message<T> message);
        /**
         * 获取Producer发送的最后一个序列号
         */
        long getLastSequenceId();
        /**
         * 获取Producer的统计信息
         */
        ProducerStats getStats();
        /**
         * 异步关闭Producer并且释放资源
         */
        CompletableFuture<Void> closeAsync();
        /**
         * 返回Producer是否连接到Broker上
         */
        boolean isConnected();
    }

    通过Producer接口可以看出Pulsar Producer提供的能力:

    • 同步发送消息
    • 异步发送消息
    • 一个Producer只能向一个特定的Topic发送消息(Producer#topic()返回了一个Topic,说明Producer会绑定到一个Topic上)
    • 批量发送(flush方法说明了应该是支持批量的,消息会在客户端内存中保存)
    • 包含了sequenceId是否可以做幂等之类的事情?
    • 统计能力

    1.3 ProducerBase

    ProducerBase作为抽象类,实现了Producer接口。

    ProducerBase包含四个属性:

    • producerCreatedFuture:异步创建Producer的Future
    • conf:Producer的配置
    • schema:消息相关的Schema信息
    • interceptors:Producer的拦截器,在发送前后插入一些操作

    producerCreatedFuture

    重命名上看这个属性是用于异步创建Producer。

    但是在一个基类中提供异步创建实体的Future显得比较难理解。一般的编程思路会在基类中定义一些基础的公共的属性,用于保存状态或者配置,比如conf。这里的producerCreatedFuture实际用于PartitionedProducerImpl异步创建多个Producer,这个后续再看。

    conf

    ProducerConfigurationData提供了Producer相关的配置信息,包含是否批量发送、内存缓存消息的大小、发送的Timeout等。

    schema

    Schema指明了消息的格式,通过Schema完成对消息的encode和decode。

    interceptors

    ProducerInterceptor是Producer提供的拦截器,包含两个方法:beforeSend、onSendAcknowledgement,分别用于在发送前和发送后插入行为。

    1.4 ProducerImpl

    ProducerImpl继承了ProducerBase,是Producer接口的核心实现。

    ProducerImpl在ProducerBase的基础上增加了大量的属性,包含:

    • producerId:通过AtomicLong生成的进程内唯一的Producer ID
    • msgIdGenerator:消息ID
    • pendingMessages:内存中缓存的消息
    • pendingCallbacks:内存中缓存的消息对应的Callback
    • timeout:发送的超时配置
    • batchMessageContainer:批量消息的容器
    • producerName:全局唯一的Producer名称
    • 等等...(在后续发送实现中介绍相关的属性)

    ProducerImpl实现了具体的发送行为,比如同步发送、异步发送(后续在消息发送的实现部分介绍)。

    1.5 PartitionedProducerImpl

    Producer提供的发送相关的API定义,ProducerBase提供了基础实现,ProducerImpl提供了具体的实现,那么PartitionedProducerImpl做什么?

    通过PartitionedProducerImpl的属性可以看到内部包含了一个ProducerImpl列表,那么可以PartitionedProducerImpl和ProducerImpl是一个组合的关系。

    通过start方法可以看出,PartitionedProducerImpl根据对应的topicMetadata的分区数创建了对应数量的ProducerImpl实例(这里也说明了ProducerBase中producerCreatedFuture的用途)。

    为什么在PartitionedProducerImpl中需要创建一组ProducerImpl实例?

    PartitionedProducerImpl另外增加了一个routerPolicy属性,其接口为:

    public interface MessageRouter extends Serializable {
    
        @Deprecated
        default int choosePartition(Message<?> msg) {
            throw new UnsupportedOperationException("Use #choosePartition(Message, TopicMetadata) instead");
        }
    
        default int choosePartition(Message<?> msg, TopicMetadata metadata) {
            return choosePartition(msg);
        }
    
    }

    通过接口的定义不难理解MessageRouter接口实现Message和Partition的映射。

    通过internalSendAsync方法的实现可以看出,发送消息时通过routerPolicy将消息映射到Partition,通过Partition选择对应的Producer执行发送,那么久解释了为什么在PartitionedProducerImpl会创建和对应Topic的分区数相同的ProducerImpl。

    通过以上内容,能总结出Producer模块的各个类的职责:

    • Producer:定义发送接口,用户使用的核心API
    • ProducerBase:Producer接口的基础实现
    • ProducerImpl:实现具体的发送行为,一个ProducerImpl只能向一个Topic写入消息
    • PartitionedProducerImpl:整合多个ProducerImpl,用于向多分区发送消息的场景

    2. 消息发送的实现

    在对Producer模块有个整体的认识后,后续内容具体阐述一条消息的发送流程。

    在消息系统中,从Producer的视角看,一条消息写入过程一般包含:

    1. 消息校验
    2. 消息属性增强(添加一些必要的系统属性)
    3. 消息路由(选择目标分区)
    4. 消息序列化
    5. 消息数据写入网络
    6. 等待写入结果响应
    7. 返回写入结果

    下面将通过ProducerImpl的实现来了解Pulsar的Producer发送消息的过程。

    2.1 寻址

    要发送一条消息,除了校验消息是否合法,首先要这条消息的写入目标(通过路由找到消息目标的Partition)。

    在ProducerImpl的构造方法最后一行调用了grabCnx()方法创建了链接(构建了链接的上下文)。

    grabCnx方法通过PulsarClient创建Connection,而PubsarClient内部则通过LookupService接口来完成Topic到Broker的映射并建立链接。

    LookupService接口提供了BinaryProtoLookupService和HttpLookupService实现,通过LookupService用户也可以实现自己的服务发现模块。

    2.2 消息发送

    发送消息的调用链如上图所示,最终通过ProducerImpl的internalSendAsync将消息发送出去。无论同步发送还是异步发送,最终都会通过异步的方式执行发送(同时只是在异步的基础上等待发送结果),这里可以看到Pulsar Producer在API实现上比较注重代码的复用性即API的最小功能原则。

    以单挑消息发送为例,sendAsync的具体实现如下:

    1. 在必要的校验后,将消息包装成OpSendMsg对象(包含异步发送完成后的Callback)
    2. 将消息添加到pendingMessages
    3. 通过Connection的EventLoop执行发送操作

    ProducerImpl将在ackReceived方法中处理服务端对写入消息的响应,通过消息的sequenceId来识别对应的OpSendMsg,并调用对应Callback来执行回调逻辑。实际在Callback完成了响应用户的操作及发送行为的一些统计。

    ProducerImpl只会建立一个链接,且发送和ACK都是通过synchronized执行的,所以中间通过pendingMessages来完成消息发送和响应的对应,以及超时的处理。这块具体可以看一下代码实现。

    总结

    本文介绍了Pulsar Producer模块的设计,包含各个类的职责,并简单介绍了消息的发送过程。Puslar Producer在设计上和RocketMQ的思想差异还是比较大的,比如Puslar Producer会将Producer对应到分区上,每个分区有自己的Producer,这样可以比较容易完成一些幂等之类的操作。

  • 相关阅读:
    并发编程知识点剖析
    JavaScript 实现留言框
    JavaScript 实现简单的 弹出框关闭框
    网络编程知识点剖析
    css清除浮动的方法
    css盒模型
    CSS的继承性和层叠性
    转载《ionic 热更新 cordova-hot-code-push》
    转《js闭包与内存泄漏》
    前端存储loaclForage
  • 原文地址:https://www.cnblogs.com/hzmark/p/pulsar-producer.html
Copyright © 2011-2022 走看看