zoukankan      html  css  js  c++  java
  • [Apache Pulsar] 企业级分布式消息系统-Pulsar快速上手

    Pulsar快速上手

    前言

    如果你还不了解Pulsar消息系统,可以先看上一篇文章 企业级分布式消息系统-Pulsar入门基础
    Pulsar客户端支持多个语言,包括Java,Go,Pytho和C++,本篇文章只讲述Java客户端。
    Pulsar Java客户端既可用于创建消息的producers、consumers和readers ,也可用于执行管理任务。Java 客户端的当前版本为 2.4.0。

    1. 安装

    最新版本的Pulsar Java 客户端库可通过 Maven中央仓库 使用。 要使用最新版本, 请将 pulsar-client 库添加到构建配置中。

    1.1 Maven

    如果你使用maven,添加以下内容到你的 pom.xml 中:

    <!-- 在你的 <properties> 部分-->
    <pulsar.version>2.4.0</pulsar.version>
    
    <!-- 在你的 <dependencies> 部分-->
    <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-client</artifactId>
      <version>${pulsar.version}</version>
    </dependency>

     

    1.2 Gradle

    如果你使用Gradle,添加以下内容到你的 build.gradle 中:

    def pulsarVersion = '2.4.0'
    
    dependencies {
    compile group: 'org.apache.pulsar', name: 'pulsar-client', version: pulsarVersion
    }  

     

    1.3 本地安装Pulsar

    Pulsar目前只支持MacOS和Linux系统,JDK版本1.8及以上。
    下载地址见下载说明及配置,Windows的小伙伴们就不用下载了。

    2.连接URL

    要使用客户端连接到Pulsar,你需要指定Pulsar 协议URL。
    Pulsar协议URL分配给特定的集群,使用pulsar scheme ,默认端口6650。以下是本地主机的示例:

    pulsar://localhost:6650

    如果有多个broker,那么URL如下:

    pulsar://localhost:6550,localhost:6651,localhost:6652

    生产环境的Pulsar 集群URL如下:

    pulsar://pulsar.us-west.example.com:6650

    如果需要TLS认证,URL如下:

    pulsar+ssl://pulsar.us-west.example.com:6651

     

    3.客户端配置

    你可以用一个URL来实例化一个连接到指定的Pulsar 集群的 PulsarClient 对象,像这样:

    PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();

    如果有多个brokers,实例化客户端如下:

    PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650,localhost:6651,localhost:6652")
        .build();

    默认的broker URL是单机集群。 如果你使用单机模式运行一个集群,broker将默认使用pulsar://localhost:6650

    3.1 生产者

    在Pulsar中,生产者写消息到topic中。 一旦你实例化一个Pulsar Client对象,你可以创建一个Producer 用于特定的topic。

    Producer<byte[]> producer = client.newProducer()
        .topic("my-topic")
        .create();
    
    // 然后你就可以发送消息到指定的broker 和topic上:
    producer.send("My message".getBytes());

    默认情况下,生产者生成由字节数组组成的消息。当然,你也可以指定消息类型,例如下面的String类型:

    Producer<String> stringProducer = client.newProducer(Schema.STRING)
        .topic("my-topic")
        .create();
    stringProducer.send("My message");

    在不再使用时,你需要确保关闭生产者、消费者和客户端
    producer.close(); consumer.close(); client.close();

    关闭操作也可以是异步的:
    
    //...业务代码
    
    producer.closeAsync()
       .thenRun(() -> System.out.println("Producer closed"));
       .exceptionally((ex) -> {
           System.err.println("Failed to close producer: " + ex);
           return ex;
       });
     
    3.1.1 生产者配置

    如果实例化生产者对象时仅指定topic名称 (如上面的示例所示), 则生产者将使用默认配置。 要使用非默认配置, 你可以设置多种可配置的参数。详情见ProducerBuilder的文档说明,下面是一个示例:

    Producer<byte[]> producer = client.newProducer()
    .topic("my-topic")                                  //主题名称
    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) //最大发布延迟时间
    .sendTimeout(10, TimeUnit.SECONDS)                  //超时时间
    .blockIfQueueFull(true)                             //队列满了,是否阻塞
    .create();
     
    3.1.2 消息路由 #####

    使用分区主题时,当你使用生产者发布消息时你可以指定路由模式。

    3.1.3 异步发送

    你可以使用Java客户端异步发布消息。 使用异步发送,生产者将消息放入阻塞队列并立即返回。 然后,客户端将在后台将消息发送给broker。 如果队列已满(配置的最大值),则在调用API时,生产者可能会被阻塞或立即失败,具体取决于传递给生产者的参数。
    以下是异步发送操作的示例:

    producer.sendAsync("my-async-message".getBytes()).thenAccept(msgId -> {
        System.out.printf("Message with ID %s successfully sent", msgId);
    });
     
    3.1.4 消息配置

    除了value之外, 还可以在特定消息上设置其他选项:

    producer.newMessage()
    .key("my-message-key")                      //消息的key
    .value("my-async-message".getBytes())       //消息内容的字节数组
    .property("my-key", "my-value")             //自定义的key/value
    .property("my-other-key", "my-other-value")
    .send();
     
    3.2 消费者

    在Pulsar中,消费者订阅topic并处理生产者发布到这些topic的消息。 你可以首先实例化一个PulsarClient对象并传给他一个borker URL(和生产样的一样)来实例化一个消费者。
    一旦实例化一个PulsarClient 对象,你可以指定一个主题和一个订阅来创建一个 Consumer 消费者。

    Consumer consumer = client.newConsumer()
        .topic("my-topic")                      //生产者定义的topic
        .subscriptionName("my-subscription")    //消费者自定义的订阅名称
        .subscribe();

    subscribe()方法将自动将订阅消费者指定的主题, 一种让消费者监听主题的方法是使用while循环,示例如下:

    while (true) {
    
      // 等待一个消息
      Message msg = consumer.receive();
    
      try {
          // 对这个消息的处理(业务)
          System.out.printf("Message received: %s", new String(msg.getData()));
    
          // 消费者确认消息已消费,同时broker删除该消息
          consumer.acknowledge(msg);
    
      } catch (Exception e) {
    
          // 消息处理失败,否定确认,该消息稍后会重发
          consumer.negativeAcknowledge(msg);
      }
    }
     
    3.2.1 消费者配置

    如果实例化 消费者对象, 仅指定主题和订阅名称, 如上面的示例所示, 消费者将采用默认配置。 要使用非默认配置, 你可以设置多种可配置的参数。详情见ConsumerBuilder的说明,下面是一个示例:

    Consumer consumer = client.newConsumer()
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .ackTimeout(10, TimeUnit.SECONDS)               //确认超时时间
        .subscriptionType(SubscriptionType.Exclusive)   //订阅模式
        .subscribe();
     
    3.2.2 异步接收

    receive方法将异步接受消息(消费者处理器将被阻塞,直到有消息到达)。 你也可以使用异步接收方法,这将在一个新消息到达时立即返回一个CompletableFuture对象。示例如下:

    CompletableFuture<Message> asyncMessage = consumer.receiveAsync();
     
    3.2.3 多主题订阅

    消费者除了订阅单个Pulsar主题外,你还可以使用多主题订阅订阅多个主题。 若要使用多主题订阅, 可以提供一个topic正则表达式 (regex) 或 主题List 。 如果通过 regex 选择主题, 则所有主题都必须位于同一Pulsar命名空间中。
    下面是一些示例:

    import org.apache.pulsar.client.api.Consumer;
    import org.apache.pulsar.client.api.PulsarClient;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.regex.Pattern;
    
    ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
            .subscriptionName(subscription);
    
    // 订阅命名空间中的所有主题
    Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
    Consumer allTopicsConsumer = consumerBuilder
            .topicsPattern(allTopicsInNamespace)
            .subscribe();
    
    // 使用regex订阅命名空间中的主题子集
    Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
    Consumer allTopicsConsumer = consumerBuilder
            .topicsPattern(someTopicsInNamespace)
            .subscribe();

    你还可以订阅明确的主题列表 (可跨命名空间):

    List<String> topics = Arrays.asList(
        "topic-1",
        "topic-2",
        "topic-3"
    );
    
    Consumer multiTopicConsumer = consumerBuilder
            .topics(topics)
            .subscribe();
    
    // 或者:
    Consumer multiTopicConsumer = consumerBuilder
            .topics(
                "topic-1",
                "topic-2",
                "topic-3"
            )
            .subscribe();

    你也可以使用subscribeAsync 方法异步订阅多主题,下面是一个示例:

    Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default.*");
    consumerBuilder
            .topics(topics)
            .subscribeAsync()
            .thenAccept(this::receiveMessageFromConsumer);
    
    private void receiveMessageFromConsumer(Consumer consumer) {
        consumer.receiveAsync().thenAccept(message -> {
                    // 业务处理
                    receiveMessageFromConsumer(consumer);
                });
    }
    3.2.4 订阅模型

    Pulsar有多种订阅模型来适用不同的场景,订阅模型见Pulsar基础概念,下面讲述如何使用。
    为了更好的描述他们之间的不同,假设你创建了一个topic,命名为"my-topic",生产者发布了10条消息,示例如下:

    //创建生产者
    Producer<String> producer = client.newProducer(Schema.STRING)
        .topic("my-topic")
        .enableBatch(false)
        .create();
    
    // "key-1"的消息有3条
    // "key-2"的消息有3条
    // "key-3"的消息有2条
    // "key-4"的消息有2条
    producer.newMessage().key("key-1").value("message-1-1").send();
    producer.newMessage().key("key-1").value("message-1-2").send();
    producer.newMessage().key("key-1").value("message-1-3").send();
    producer.newMessage().key("key-2").value("message-2-1").send();
    producer.newMessage().key("key-2").value("message-2-2").send();
    producer.newMessage().key("key-2").value("message-2-3").send();
    producer.newMessage().key("key-3").value("message-3-1").send();
    producer.newMessage().key("key-3").value("message-3-2").send();
    producer.newMessage().key("key-4").value("message-4-1").send();
    producer.newMessage().key("key-4").value("message-4-2").send();

    Exclusive(独占模式):
    创建一个消费者,以Exclusive模式订阅消息,代码如下:

    Consumer consumer = client.newConsumer()
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .subscriptionType(SubscriptionType.Exclusive)   //独占模式
        .subscribe()

    只有第一个消费者可以订阅,其他消费者订阅会报错。这就意味着第一个消费者可以收到所有的10条消息,消息消费的顺序和生产的顺序是一样的。
    Failover(灾备):
    创建一个消费者,以Exclusive模式订阅消息,代码如下:

    //创建消费者1
    Consumer consumer1 = client.newConsumer()
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .subscriptionType(SubscriptionType.Failover)    //灾备模式
        .subscribe()
    
    //创建消费者2
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Failover)    //灾备模式
            .subscribe()

    conumser1是起作用的消费者, consumer2是备用消费者。假设consumer1收到的5条消息后突然崩了, 那么consumer2接替,成了起作用的消费者。
    当然多个消费者都可以订阅,但是只有第一个是可用,第一个消费者断开连接后,下一个备用的消费者就起作用了。
    Shared(共享):
    创建一个消费者,以Exclusive模式订阅消息,代码如下:

    Consumer consumer1 = client.newConsumer()
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .subscriptionType(SubscriptionType.Shared)  //共享模式
        .subscribe()
    
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Shared)
            .subscribe()
    
    //这两个消费者都是可用的

    在共享模式,多个消费者都可以订阅,消息在多个消费者之间是以轮询的方式分发。
    如果broke同一时间只发送一个消息,那么consume1收到5条消息:

    ("key-1", "message-1-1")
    ("key-1", "message-1-3")
    ("key-2", "message-2-2")
    ("key-3", "message-3-1")
    ("key-4", "message-4-1")

    消费者2收到另外5条消息。
    总之,共享模式和其他两种模式不同,共享模式有更好的灵活性,但是不能保证消息的顺序。
    Key_share
    这是2.4.0版本后新出的订阅模式,代码如下:

    Consumer consumer1 = client.newConsumer()
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .subscriptionType(SubscriptionType.Key_Shared) //key共享模式
        .subscribe()
    
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Key_Shared)
            .subscribe()

    KeyShared和Shared模式类似,区别在于KeyShared模式下,具有相同key的消息分发到同一个消费者。
    消费者1最后收到5条消息:

    ("key-1", "message-1-1")
    ("key-1", "message-1-2")
    ("key-1", "message-1-3")
    ("key-3", "message-3-1")
    ("key-3", "message-3-2")

    消费者2收到另外5条。

    如果该模式下消息的key没有指定,那么所有的消息默认分发到同一消费者。

    3.2.5 Reader接口

    使用 reader 接口, Pulsar客户可以在topic中“手动定位”,从指定的消息开始向前读取所有消息。Pulsar Java API 可以创建Reader对象,同时指定一个 topic, 一个MessageId ,和ReaderConfiguration。
    下面是一个示例:

    ReaderConfiguration conf = new ReaderConfiguration();
    
    byte[] msgIdBytes = // 一些消息ID 的字节数组
    MessageId id = MessageId.fromByteArray(msgIdBytes);
    
    Reader reader = pulsarClient.newReader()
            .topic(topic)
            .startMessageId(id)
            .create();
    
    while (true) {
        Message message = reader.readNext();
        // 处理消息
    }

    在上面的示例中,实例化一个Reader对象指定的主题和消息(ID); reader将遍历主题中msgIdBytes(取值方式取决于应用程序) 之后的消息。
    上面的示例代码展示了Reader对象指向特定的消息(ID),但你也可以使用MessageId.earliest来指向topic上最早可用的消息,使用MessageId.latest指向最新的消息。

    3.3 Schema

    在Pulsar中,所有的消息数据都在字节数组中,消息schema允许在构造和处理消息时使用其他类型的数据(从简单类型(如String)到更复杂的类型)。如果在不指定schema的情况下构造生产者,则生产者只能生成类型为 byte[]的消息。 下面是一个示例:

    Producer<byte[]> producer = client.newProducer()
        .topic(topic)
        .create();

    以下schema格式目前可用于 Java:

    • 无schema 或者字节数组schema(使用Schema.BYTES) 
      Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES) .topic("some-raw-bytes-topic") .create();
    • String,UTF-8编码,使用Schema.STRING 
      Producer<String> stringProducer = client.newProducer(Schema.STRING) .topic("some-string-topic") .create();
    • JSON 模式,创建POJO 
      Schema<MyPojo> pojoSchema = JSONSchema.of(MyPojo.class); Producer<MyPojo> pojoProducer = client.newProducer(pojoSchema) .topic("some-pojo-topic") .create();

    结语

    Pulsar的特性还有很多,这里重点介绍了Java客户端的快速上手教程,后面有时间的话会继续更新Pulsar系列。

  • 相关阅读:
    Two strings CodeForces
    Dasha and Photos CodeForces
    Largest Beautiful Number CodeForces
    Timetable CodeForces
    Financiers Game CodeForces
    AC日记——整理药名 openjudge 1.7 15
    AC日记——大小写字母互换 openjudge 1.7 14
    AC日记——将字符串中的小写字母换成大写字母 openjudge 1.7 13
    AC日记——加密的病历单 openjudge 1.7 12
    AC日记——潜伏着 openjudge 1.7 11
  • 原文地址:https://www.cnblogs.com/iceblow/p/11327466.html
Copyright © 2011-2022 走看看