zoukankan      html  css  js  c++  java
  • KafkaSpout分析:配置

    public KafkaSpout(SpoutConfig spoutConf) {
            _spoutConfig = spoutConf;
    }

    基于0.93版本的Storm

    SpoutConfig继承自KafkaConfig。由于SpoutConfig和KafkaConfig所有的instance field全是public, 因此在使用构造方法后,可以直接设置各个域的值。

    public class SpoutConfig extends KafkaConfig implements Serializable {
        public List<String> zkServers = null; //记录Spout读取进度所用的zookeeper的host
        public Integer zkPort = null;//记录进度用的zookeeper的端口
        public String zkRoot = null;//进度信息记录于zookeeper的哪个路径下
        public String id = null;//进度记录的id,想要一个新的Spout读取之前的记录,应把它的id设为跟之前的一样。
        public long stateUpdateIntervalMs = 2000;//多久往Zookeeper记录一次进度
    
        public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
            super(hosts, topic);
            this.zkRoot = zkRoot;
            this.id = id;
        }
    }
    public class KafkaConfig implements Serializable {
    
        public final BrokerHosts hosts; //用以获取Kafka broker和partition的信息
        public final String topic;//从哪个topic读取消息
        public final String clientId; // SimpleConsumer所用的client id
    
        public int fetchSizeBytes = 1024 * 1024; //发给Kafka的每个FetchRequest中,用此指定想要的response中总的消息的大小
        public int socketTimeoutMs = 10000;//与Kafka broker的连接的socket超时时间
        public int fetchMaxWait = 10000;   //当服务器没有新消息时,消费者会等待这些时间
        public int bufferSizeBytes = 1024 * 1024;//SimpleConsumer所使用的SocketChannel的读缓冲区大小
        public MultiScheme scheme = new RawMultiScheme();//从Kafka中取出的byte[],该如何反序列化
        public boolean forceFromStart = false;//是否强制从Kafka中offset最小的开始读起
        public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//从何时的offset时间开始读,默认为最旧的offset
        public long maxOffsetBehind = Long.MAX_VALUE;//KafkaSpout读取的进度与目标进度相差多少,相差太多,Spout会丢弃中间的消息 
       public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//如果所请求的offset对应的消息在Kafka中不存在,是否使用startOffsetTime
       public int metricsTimeBucketSizeInSecs = 60;//多长时间统计一次metrics
       public KafkaConfig(BrokerHosts hosts, String topic) {
        this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
      }
      
    public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
        this.hosts = hosts;
        
    this.topic = topic;
        
    this.clientId = clientId;
       }
    }

     对Zookeeper的使用

    KafkaSpout的配置中有两个地方可以用到Zookeeper

    1. 用Zookeeper来记录KafkaSpout的处理进度,在topology重新提交或者task重启后继续之前的处理进度。在SpoutConfig中的zkServers, zkPort和zkRoot与此相关。如果zkServer和zkPort没有设置,那么KafkaSpout会使用Storm集群所用的Zookeeper记录这些信息。
    2. 用Zookeeper来获取Kafka中一个topic的所有partition,和每个partition的leader。这需要实现BrokerHosts的子类ZkHosts.但是,这个Zookeepr是可选的。如果使用BrokerHosts的另一个子类StaticHosts,把partition和leader的对应关系硬编码,则不需要Zookeeper来提供此功能。KafkaSpout会从Kafka集群使用的Zookeeper中提取partition和leader的对应关系。而且:
      • 如果使用StatisHosts,那么KafkaSpout会使用StaticCoordinator,这个coordinator不能响应partition leader的变化。
      • 如果使用ZkHosts,那么KafkaSpout会使用ZkCoordinator, 当其refresh()方法被调用后,这个cooridnator会检查发生leader变更的partition,并为之生成新的PartitionManager.从而能够在leader变更后,继续读取消息。

    影响初始读取进度的配置项

    在一个topology上线后,它从哪个offset开始读取消息呢?有一些配置项对此有影响:

    1. SpoutConfig中的id字段。如果想要一个topology从另一个topology之前的处理进度继续处理,它们需要有相同的id。
    2. KafkaConfig的forceFromStart字段。如果此字段设为true, 那么它一个topology上线后,它会忽略之前相同id的topology的进度,并且从Kafka中最早的消息开始处理。
    3. KafkaConfig的startOffsetTime字段。默认为kafka.api.OffsetRequest.EarliestTime()开始读,也就是从Kafka中最早的消息开始处理。也可以设成kafka.api.OffsetRequest.LatestOffset,也就是最早的消息开始读。也可以自己指定具体的值。
    4. KafkaConfig的maxOffsetBehind字段。这个字段对于KafkaSpout的多个处理流程都有影响。当提交一个新topology时,如果没有forceFromStart, 当KafkaSpout对某个partition的处理进度落后startOffsetTime对应的offset多于此值时,KafkaSpout会丢弃中间的消息,从而强制赶上目标进度.比如,如果startOffsetTime设成了lastestTime,那么如果进度落后超过maxOffsetBehind,KafkaSpout会直接从latestTime对应的offset开始处理。如果设成了froceFromStart,则在提交新任务时,始终会从EarliestTime开始读。
    5. KafkaSpout的userStartOffsetTimeIfOffsetOutOfRange字段。如果设成true,那么当fetch消息时出错,且FetchResponse显示的出错原因是OFFSET_OUT_OF_RANGE,那么就会尝试从KafkaSpout指定的startOffsetTime对应的消息开始读。例如,如果有一批消息因为超过了保存期限被Kafka删除,并且zk里记录的消息在这批被删除的消息里。如果KafkaSpout试图从zk的记录继续读,那么就会出现OFFSET_OUT_OF_RANGE的错误,从而触发这个配置。

    实际上maxOffsetBehind有时候有点名不符实。当startOffsetTime为A, zk里的进度为B, A - B > maxOffsetBehind时,应该从A - maxOffsetBehind除开始读或许更好一些,而不是直接跳到startOffsetTime。此处的逻辑参见PartitionManager的实现。

    附:其中KafkaConfig的maxWait的意义请参见这篇文章 《卡夫卡的炼狱》

    实际上,KafkaSpout的一些行为可能会比较诡异,特别是与maxOffsetBehind有关的部分。这些行为由PartitionManager决定,参见对PartitionManager的分析这篇文章。

  • 相关阅读:
    VMware克隆虚拟机
    3种Redis分布式锁的对比
    高并发下缓存与数据库双写不一致解决方案
    30分钟Maven入门到精通
    Linux安装rabbitmq (解决guest无法登陆问题)
    RocketMQ高性能原理(pushConsumer,CommitLog,ZeroCopy)
    XA 分布式事务原理(转)
    SpringBoot学习(八)RestTemplate/WebClient 调用REST服务、Validation校验和发邮件
    @Valid和@Validated的总结区分(转)
    Dubbo 同步调用原理(转)
  • 原文地址:https://www.cnblogs.com/devos/p/4335302.html
Copyright © 2011-2022 走看看