zoukankan      html  css  js  c++  java
  • storm-KafkaSpuout源码分析

    一、属性

    //org.apache.storm.spout.SpoutOutputCollector
    protected SpoutOutputCollector collector;
    
    //org.apache.storm.kafka.spout.KafkaSpoutConfig
    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
    
    //consumer工厂
    private KafkaConsumerFactory kafkaConsumerFactory;
    
    //consumer
    private transient KafkaConsumer<K, V> kafkaConsumer;
    
    //是否是consume自动提交
    private transient boolean consumerAutoCommitMode;
    
    //首次拉取消息的offset
    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
    
    //重试机制
    private transient KafkaSpoutRetryService retryService;
    
    //提交offset的定时器
    private transient Timer commitTimer;
    
    //是否已经初始化
    private transient boolean initialized;
    
    //offset管理器
    private transient Map<TopicPartition, OffsetManager> offsetManagers;
    
    //已经发射的元组集合
    private transient Set<KafkaSpoutMessageId> emitted;
    
    //等待发射的元组
    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;
    
    //允许未提交的offset数量,达到这个值后将提交offset
    private transient long numUncommittedOffsets;
    
    //刷新订阅主题的定时器
    private transient Timer refreshSubscriptionTimer;
    
    //拓扑关系上下文
    private transient TopologyContext context;
    

    其中:

    private transient Map<TopicPartition, OffsetManager> offsetManagers;
    private transient Set<KafkaSpoutMessageId> emitted;
    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;
    

    KafkaSpout就是围绕着这3个集合进行的。

    当拉取消息后首先放到waitingToEmit,然后发射后放到emitted中,最后ack后放到offsetManagers。理解了这3个的关系就很容易理解整个流程了

    二、构造方法

    KafkaSpout继承自BaseRichSpout并覆盖实现了(因为父类是空实现)所有方法,而BaseRichSpout又extends BaseComponent implements IRichSpout。因此KafkaSpout整体结构如下:

    KafkaSpout的构造方法:

    //传入一个KafkaSpoutConfig来构造KafkaSpout,使用默认的consumer factory
    public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
        this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault());
    }
    
    //传入KakfaSpoutConfig和KafkaConsumerFactory来构造
    KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
        this.kafkaConsumerFactory = kafkaConsumerFactory;
        this.kafkaSpoutConfig = kafkaSpoutConfig;
    }
    

    其中KafkaSpoutConfig包含了对KafkaSpout的所有配置以及对应consumer的配置。

    KafkaConsumerFactoryDefault()是一个默认的consumer factory实现,如下:

    	public class KafkaConsumerFactoryDefault<K, V> implements KafkaConsumerFactory<K, V> {
        public KafkaConsumerFactoryDefault() {
        }
    
        public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
            return new KafkaConsumer(kafkaSpoutConfig.getKafkaProps(), kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
        }
    }
    

    三、主要方法解析

    1.open()

    实现自ISpout,主要是进行一些初始化

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    	//未初始化
        this.initialized = false;
    	//设置上下文
        this.context = context;
    	//收集器
        this.collector = collector;
    	//允许未提交的Offset数量,默认0,及实时提交
        this.numUncommittedOffsets = 0L;
    	//第一次拉取消息的offset,默认UNCOMMITTED_EARLIEST,即最早未提交
        this.firstPollOffsetStrategy = this.kafkaSpoutConfig.getFirstPollOffsetStrategy();
    	//消费者的offset提交模式,默认false,即由Kafkaspout管理提交offset,若果为true则有consumer自动提交offset
        this.consumerAutoCommitMode = this.kafkaSpoutConfig.isConsumerAutoCommitMode();
    	//重试服务
        this.retryService = this.kafkaSpoutConfig.getRetryService();
    	//入股不是自动提交模式,则在定时提交,时间间隔由KafkaSpoutConfig传入
        if (!this.consumerAutoCommitMode) {
            this.commitTimer = new Timer(500L, this.kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
        }
    	//重新刷新订阅主题定时器,默认500ms,时间间隔由KafkaSpoutConfig传入
        this.refreshSubscriptionTimer = new Timer(500L, this.kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
    	//实例化
        this.offsetManagers = new HashMap();
        this.emitted = new HashSet();
        this.waitingToEmit = Collections.emptyListIterator();
        LOG.info("Kafka Spout opened with the following configuration: {}", this.kafkaSpoutConfig);
    }
    

    2.setAcked()

    确认数据已被正确处理

    当数据被bolt处理后,bolt调用ack后表明已被争取处理,在这里将TopicPartition和offset加入offsetManagers

    private void setAcked(TopicPartition tp, long fetchOffset) {
        if (!this.consumerAutoCommitMode && !this.offsetManagers.containsKey(tp)) {
            this.offsetManagers.put(tp, new OffsetManager(tp, fetchOffset));
        }
    }
    

    3.nextTuple()

    发送元组。

    大致逻辑如下:

    首先判断是否已经完成初始化,如果是,则判断是否已经提交,如果已提交,则确认提交,接着判断是否允许从Kafka拉取数据,允许的条件是:

     boolean poll = !this.waitingToEmit() && (this.numUncommittedOffsets - (long)readyMessageCount < (long)maxUncommittedOffsets || this.consumerAutoCommitMode);
    

    即没有正在等待提交的并且未提交的数量-准备好的数量<最大允许未提交的数量,如果为true,则拉取消息并加入到等待提交的列表中;然后判断是否正在等待发射,如果是,则发射元组。看下边代码会更清楚一些。

    public void nextTuple() {
        try {
    		//如果已经初始化
            if (this.initialized) {
    			//如果已经提交
                if (this.commit()) {
    				//确认提交
                    this.commitOffsetsForAckedTuples();
                }
    			//如果允许拉取
                if (this.poll()) {
                    try {
    					//拉取Kafka数据并放进waitingToEmit集合中
                        this.setWaitingToEmit(this.pollKafkaBroker());
                    } catch (RetriableException var2) {
                        LOG.error("Failed to poll from kafka.", var2);
                    }
                }
    			//如果正在等待发射,则发射
                if (this.waitingToEmit()) {
                    this.emit();
                }
            } else {
                LOG.debug("Spout not initialized. Not sending tuples until initialization completes");
            }
        } catch (InterruptException var3) {
            this.throwKafkaConsumerInterruptedException();
        }
    
    }
    

    5.poll()

    private boolean poll() {
        int maxUncommittedOffsets = this.kafkaSpoutConfig.getMaxUncommittedOffsets();
        int readyMessageCount = this.retryService.readyMessageCount();
        boolean poll = !this.waitingToEmit() && (this.numUncommittedOffsets - (long)readyMessageCount < (long)maxUncommittedOffsets || this.consumerAutoCommitMode);
        if (!poll) {
            if (this.waitingToEmit()) {
                LOG.debug("Not polling. Tuples waiting to be emitted. [{}] uncommitted offsets across all topic partitions", this.numUncommittedOffsets);
            }
    
            if (this.numUncommittedOffsets >= (long)maxUncommittedOffsets && !this.consumerAutoCommitMode) {
                LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]", this.numUncommittedOffsets, maxUncommittedOffsets);
            }
        }
    
        return poll;
    }
    

    6.commit()

    判断是改提交。

    如果 非 自动提交模式 并且 提交时间间隔已到则返回true

    private boolean commit() {
        return !this.consumerAutoCommitMode && this.commitTimer.isExpiredResetOnTrue();
    }
    

    7.waitingToEmit()

    是否正在等待发射.

    如果 waitingToEmit不为空 则返回true,表明有正在等待发射的

    private boolean waitingToEmit() {
        return this.waitingToEmit != null && this.waitingToEmit.hasNext();
    }
    

    8.setWaitingToEmit()

    设置准备发射的。

    作用就是将消费到的数据添加到一个等待提交的列表waitingToEmit中

    public void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
        List<ConsumerRecord<K, V>> waitingToEmitList = new LinkedList();
        Iterator var3 = consumerRecords.partitions().iterator();
    	//遍历所有主题分区
        while(var3.hasNext()) {
    		//得到主题分区
            TopicPartition tp = (TopicPartition)var3.next();
    		//添加对应的主题分区数据
            waitingToEmitList.addAll(consumerRecords.records(tp));
        }
    	//添加到等待发射的
        this.waitingToEmit = waitingToEmitList.iterator();
    }
    

    9.pollKafkaBroker

    拉取Kafka数据

    private ConsumerRecords<K, V> pollKafkaBroker() {
    	//先处理未成功的进行重置offset操作
        this.doSeekRetriableTopicPartitions();
    	//如果到了刷新订阅时间,则先刷新
        if (this.refreshSubscriptionTimer.isExpiredResetOnTrue()) {
            this.kafkaSpoutConfig.getSubscription().refreshAssignment();
        }
    	//拉取数据
        ConsumerRecords<K, V> consumerRecords = this.kafkaConsumer.poll(this.kafkaSpoutConfig.getPollTimeoutMs());
        int numPolledRecords = consumerRecords.count();
        LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions", numPolledRecords, this.numUncommittedOffsets);
        return consumerRecords;
    }
    

    10.doSeekRetriableTopicPartitions

    private void doSeekRetriableTopicPartitions() {
    	//先获取需要重试的集合
        Map<TopicPartition, Long> retriableTopicPartitions = this.retryService.earliestRetriableOffsets();
        Iterator var2 = retriableTopicPartitions.entrySet().iterator();
    
        while(var2.hasNext()) {
            Entry<TopicPartition, Long> retriableTopicPartitionAndOffset = (Entry)var2.next();
    		//重新定位offset
            this.kafkaConsumer.seek((TopicPartition)retriableTopicPartitionAndOffset.getKey(), ((Long)retriableTopicPartitionAndOffset.getValue()).longValue());
        }
    
    }
    

    11.emit()

    发射元组。注意:这里只是对等待发射的waitingToEmit列表进行操作。

    private void emit() {
    	//如果没有发射并且存在等待发射的,则将正在等待的从列表中移除,然后进行发射(发射操作在nextTuple()中)
        while(!this.emitTupleIfNotEmitted((ConsumerRecord)this.waitingToEmit.next()) && this.waitingToEmit.hasNext()) {
            this.waitingToEmit.remove();
        }
    }
    

    12.emitTupleIfNotEmitted()

    发射没有发射的元组

    主要逻辑如下:

    首先判断原则是否已经被确认,

    然后判断是否已经发射,

    满足以上两个条件之一时说明不需要发射。

    然后判断需不需要发射tuple:主要是判断tuple是否为空,以及配置中是否允许发射空的tuple。如果需要发射则:

    先判断重试列表中有没有当前tuple,若有,则判断是否需要重发,若需要,则继续判断是否是自动提交offset模式,若自动提交则直接发射,否则加入offsetManagers,然后判断是否在重试列表中,是则从重试列表中移除,否则将未提交offset数量加1,然后发射

    private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
        TopicPartition tp = new TopicPartition(record.topic(), record.partition());
        KafkaSpoutMessageId msgId = this.retryService.getMessageId(record);
        if (this.offsetManagers.containsKey(tp) && ((OffsetManager)this.offsetManagers.get(tp)).contains(msgId)) {
            LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
        } else if (this.emitted.contains(msgId)) {
            LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
        } else {
            List<Object> tuple = this.kafkaSpoutConfig.getTranslator().apply(record);
            if (this.isEmitTuple(tuple)) {
                boolean isScheduled = this.retryService.isScheduled(msgId);
                if (!isScheduled || this.retryService.isReady(msgId)) {
                    if (this.consumerAutoCommitMode) {
                        if (tuple instanceof KafkaTuple) {
                            this.collector.emit(((KafkaTuple)tuple).getStream(), tuple);
                        } else {
                            this.collector.emit(tuple);
                        }
                    } else {
                        this.emitted.add(msgId);
                        ((OffsetManager)this.offsetManagers.get(tp)).addToEmitMsgs(msgId.offset());
                        if (isScheduled) {
                            this.retryService.remove(msgId);
                        } else {
                            ++this.numUncommittedOffsets;
                        }
    
                        if (tuple instanceof KafkaTuple) {
                            this.collector.emit(((KafkaTuple)tuple).getStream(), tuple, msgId);
                        } else {
                            this.collector.emit(tuple, msgId);
                        }
                    }
    
                    LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", new Object[]{tuple, record, msgId});
                    return true;
                }
            } else {
                LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record);
                msgId.setEmitted(false);
                this.ack(msgId);
            }
        }
    
        return false;
    }
    

    12.isEmitTuple()

    是否发送元组。先判空,然后在判断config中是否允许发射空原则,满足一个即可

    private boolean isEmitTuple(List<Object> tuple) {
        return tuple != null || this.kafkaSpoutConfig.isEmitNullTuples();
    }
    

    13.commitOffsetsForAckedTuples()

    提交已经确认的Tuple的偏移量

    private void commitOffsetsForAckedTuples() {
    	//用来保存将要提交的数据
        Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap();
    	//从offsetManagers获取数据
        Iterator var2 = this.offsetManagers.entrySet().iterator();
    
        Entry tpOffset;
        while(var2.hasNext()) {
            tpOffset = (Entry)var2.next();
            OffsetAndMetadata nextCommitOffset = ((OffsetManager)tpOffset.getValue()).findNextCommitOffset();
            if (nextCommitOffset != null) {
                nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
            }
        }
    
        if (!nextCommitOffsets.isEmpty()) {
    		//进行提交操作
            this.kafkaConsumer.commitSync(nextCommitOffsets);
            LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets);
            var2 = nextCommitOffsets.entrySet().iterator();
    		//日志输出未提交的
            while(var2.hasNext()) {
                tpOffset = (Entry)var2.next();
                TopicPartition tp = (TopicPartition)tpOffset.getKey();
                OffsetManager offsetManager = (OffsetManager)this.offsetManagers.get(tp);
                long numCommittedOffsets = offsetManager.commit((OffsetAndMetadata)tpOffset.getValue());
                this.numUncommittedOffsets -= numCommittedOffsets;
                LOG.debug("[{}] uncommitted offsets across all topic partitions", this.numUncommittedOffsets);
            }
        } else {
            LOG.trace("No offsets to commit. {}", this);
        }
    
    }
    

    14.ack()

    public void ack(Object messageId) {
    	//转化为KafkaMessageId
        KafkaSpoutMessageId msgId = (KafkaSpoutMessageId)messageId;
    	//如果已经提交的不包含msgId
        if (!this.emitted.contains(msgId)) {
    		//如果已经提交
            if (msgId.isEmitted()) {
                LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that came from a topic-partition that this consumer group instance is no longer tracking due to rebalance/partition reassignment. No action taken.", msgId);
            } else {
                LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId);
            }
        } else {//如果不已提交列表不包含msgId
    		//如果非自动提交模式
            if (!this.consumerAutoCommitMode) {
    			//添加到确认列表
                ((OffsetManager)this.offsetManagers.get(msgId.getTopicPartition())).addToAckMsgs(msgId);
            }
    		//移除
            this.emitted.remove(msgId);
        }
    
    }
    

    15.fail()

    失败处理。

    主要操作就是增加失败次数,然后调用重试服务进行重试

    public void fail(Object messageId) {
        KafkaSpoutMessageId msgId = (KafkaSpoutMessageId)messageId;
        if (!this.emitted.contains(msgId)) {
            LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
        } else {
            msgId.incrementNumFails();
            if (!this.retryService.schedule(msgId)) {
                LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
                this.ack(msgId);
            } else {
                this.emitted.remove(msgId);
            }
    
        }
    }
    
  • 相关阅读:
    loj#6433. 「PKUSC2018」最大前缀和(状压dp)
    PKUWC2019游记
    10. Regular Expression Matching
    9. Palindrome Number
    8. String to Integer (atoi)
    7. Reverse Integer
    6. ZigZag Conversion
    5. Longest Palindromic Substring
    4. Median of Two Sorted Arrays
    3. Longest Substring Without Repeating Characters
  • 原文地址:https://www.cnblogs.com/cnsec/p/13286645.html
Copyright © 2011-2022 走看看