zoukankan      html  css  js  c++  java
  • RocketMQ(4.8.0)——默认的两种消费者核心属性和方法

    RocketMQ(4.8.0)——默认的两种消费者核心属性和方法

      RocketMQ客户端有两个独立的消费者实现类:org.apache.rocketmq.client.consumer.DefaultMQPullConsumerorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer
      下面分别对这2种类进行介绍:

    1.1 DefaultMQPullConsumer

      该消费者使用时需要用户主动从 Broker 中 Pull 消息和消费消息,提交消费位点。DefaultMQPullConsumer 的类图继承关系如图所示:

      可以看到,DefaultMQPullConsumer 实现时包含消费者的操作和属性配置,这是一个典型的类对象设计。下面我们介绍一些核心属性和方法。

      namesrvAddr:继承自 ClientConfig,表示 RocketMQ 集群的 Namesrv 地址,如果是多个,则用分号分开。比如:127.0.0.1:9876;127.0.0.2:9876。

      clientIP:使用客户端的程序所在机器的 IP 地址,目前支持 IPv4 和 IPv6,同时排除了本地环回接口(127.0.x.x)和私有内网地址(192.168.x.x)。如果在 Docker 中运行,获取的 IP 地址是容器所在的 IP 地址,而非宿主主机的 IP 地址。

      instanceName:客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等)。假如要在同一个机器上部署多个程序进程,那么每个进程的实例名都必须不相同,否则程序会启动失败。

      vipChannelEnabled:这是一个 boolean 值,表示是否开启 VIP 通道。VIP 通道和非 VIP 通道的区别是使用不同的端口号进行通信。

      clientCallbackExecutorThreads:客户端回调线程数。该线程数等于 Netty 通信层回调线程的个数。默认值为 Runtime.getRuntime().availableProcessors(),表示当前有效的CPU个数。

      pollNameServerInterval:获取 Topic 路由信息间隔,单位为ms,默认为30 000ms。

      heartbeatBrokerInterval:客户端和 Broker 心跳间隔,单位为ms,默认为30 000ms。

      persistConsumerOffsetInterval:持久化消费位点时间间隔,单位为ms,默认为5000ms。

      defaultMQPullConsumerImpl:默认Pull消费者的具体实现。

      consumerGroup:消费者组名字。

      brokerSuspendMaxTimeMills:在长轮询模式下,Broker 的最大挂起请求时间,建议不要修改此值。

      consumerTimeoutMillsWhenSuspend:在长轮询模式下,消费者的最大请求超时时间,必须比 brokerSuspendMaxTimeMills 要大,不建议修改此值。

      consumerPullTimeoutMills:消费者 Pull 消息时 Socket 的超时时间。

      messageModel:消费模式,现在支持集群模式消费和广播模式消费。

      messageQueueListener:消息路由信息变化时回调处理监听器,一般在重新平衡时会被调用。

      offsetStore:位点存储模块。集群模式位点会持久化到 Broker 中,广播模式持久化到本地文件中,位点存储模块有两个实现类:RemoteBrokerOffsetStoreLocalFileOffsetStore

      allocateMessageQueueStrategy:消费Queue分配策略管理器。

      maxReconsumeTimes:最大重试次数,可以配置。

      下面介绍一些核心方法。由于生产者消费者都继承了 MQAdmin 接口,所以管理相关的接口都是一样的,不再赘述。

      registerMessageQueueListener():注册队列变化监听器,当队列发生变化时会被监听到。

      pull():从 Broker 中 Pull 消息,如果有PullCallback参数,则表示异步拉取。

      pullBlockIfNotFound():长轮询方式拉取。如果没有拉取到消息,那么 Broker 会将请求 Hold 住一段时间。

      updateConsumeOffset(final MessageQueue mq,final long offset):更新某一个 Queue 的消费位点。

      fetchConsumeOffset(final MessageQueue mq, final boolean fromStore):查找某个 Queue 的消费点。

      sendMessageBack(MessageExt msg, init delayLevel, String brokerName, String consumerGroup):如果消费发送失败,则可以将消息重新发回给 Broker,这个消息者组延迟一段时间后可以再消费(也就是重试)。

      fetchSubscribeMessageQueues(final String topic):获取一个 Topic 的全部 Queue 信息。

    1.2 DefaultMQPushConsumer

      DefaultMQPushConsumer的大部分属性、方法和DefaultMQPullConsumer是一样的,下面说下其核心属性和方法。

      DefaultMQPushConsumerImpl:默认的 Push 消费者具体实现类。

      consumeFromWhere:一个枚举,表示从什么位点开始消费。

    (1)CONSUME_FROM_LAST_OFFSET:从上次消费的位点开始消费,相当于断点继续。

    (2)CONSUME_FROM_FIRST_OFFSET:从 ConsumeQueue 的最小位点开始消费。

    (3)CONSUME_FROM_TIMESTAMP:从指定时间开始消费。

      consumeTimestamp:表示从哪一时刻开始消费,格式为 yyyyMMDDHHmmss,默认为半小时前。当 consumeFromWhereconsumeTimestamp 时,consumeTimestamp 设置的值才生效。

      allocateMessageQueueStrategy:消费者订阅 topic-queue 策略。

      subscription:订阅关系,表示当前消费者订阅了哪些 Topic 的哪些 Tag。

      messageListener:消息 Push 回调监听器。

      consumeThreadMin:最小消费线程数,必须小于 consumeThreadMax

      consumeThreadMax:最大消费线程数,必须大于 consumeThreadMin

      adjustThreadPoolNumsThreshold:动态调整消费线程池的线程数大小,开源版本不支持该功能。

      consumeConcurrentlyMaxSpan:并发消息的最大位点差。如果 Pull 消息的位点差超过该值,拉取变慢。

      pullThresholdForQueue:一个 Queue 能缓存的最大消息数。超过该值则采取拉取流控措施。

      pullThresholdSizeForQueue:一个 Queue 最大能缓存的消息字节数,单位是MB。

      pullThresholdForTopic:一个 Queue 最大能缓存的消息字节数,单位是MB。默认为-1,结合 pullThresholdForQueue 配置项生效,该配置项的优先级低于 pullThresholdForQueue。

      pullInterval:拉取间隔,单位为ms。

      consumeMessageBatchMaxSize:消费者每次批量消费时,最多消费多少条消息。

      pullBatchSize:一次最多拉取多少条消息。
      postSubscriptionWhenPull:每次拉取消息时是否更新订阅关系,该方法的返回值默认为False。

      maxReconsumeTimes:最大重试次数,该函数返回值默认为-1,表示默认最大重试次数为16。

      suspendCurrentQueueTimeMillis:为短轮询场景设置的挂起时间,比如顺序消息场景。

      consumeTimeout:消费超时时间,单位为min,默认值为15min。

  • 相关阅读:
    vi命令文件编辑
    Linux vi/vim编辑器常用命令与用法总结
    常用vi编辑器命令行
    在AspNetMvc中使用日志面板. Logdashboard 1.1beta
    Abp中使用可视化的日志面板
    使用logdashboard进行可视化的日志追踪
    可视化面板LogDashboard使用log4net源
    LogDashboard 1.0.4 版本发布
    什么是LogDashboard?
    使用logdashboard查看可视化日志
  • 原文地址:https://www.cnblogs.com/zuoyang/p/14411867.html
Copyright © 2011-2022 走看看