zoukankan      html  css  js  c++  java
  • storm-KafkaSpuout源码分析

    一、属性

    //org.apache.storm.spout.SpoutOutputCollector
    protected SpoutOutputCollector collector;
    
    //org.apache.storm.kafka.spout.KafkaSpoutConfig
    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
    
    //consumer工厂
    private KafkaConsumerFactory kafkaConsumerFactory;
    
    //consumer
    private transient KafkaConsumer<K, V> kafkaConsumer;
    
    //是否是consume自动提交
    private transient boolean consumerAutoCommitMode;
    
    //首次拉取消息的offset
    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
    
    //重试机制
    private transient KafkaSpoutRetryService retryService;
    
    //提交offset的定时器
    private transient Timer commitTimer;
    
    //是否已经初始化
    private transient boolean initialized;
    
    //offset管理器
    private transient Map<TopicPartition, OffsetManager> offsetManagers;
    
    //已经发射的元组集合
    private transient Set<KafkaSpoutMessageId> emitted;
    
    //等待发射的元组
    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;
    
    //允许未提交的offset数量,达到这个值后将提交offset
    private transient long numUncommittedOffsets;
    
    //刷新订阅主题的定时器
    private transient Timer refreshSubscriptionTimer;
    
    //拓扑关系上下文
    private transient TopologyContext context;
    

    其中:

    private transient Map<TopicPartition, OffsetManager> offsetManagers;
    private transient Set<KafkaSpoutMessageId> emitted;
    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;
    

    KafkaSpout就是围绕着这3个集合进行的。

    当拉取消息后首先放到waitingToEmit,然后发射后放到emitted中,最后ack后放到offsetManagers。理解了这3个的关系就很容易理解整个流程了

    二、构造方法

    KafkaSpout继承自BaseRichSpout并覆盖实现了(因为父类是空实现)所有方法,而BaseRichSpout又extends BaseComponent implements IRichSpout。因此KafkaSpout整体结构如下:

    KafkaSpout的构造方法:

    //传入一个KafkaSpoutConfig来构造KafkaSpout,使用默认的consumer factory
    public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
        this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault());
    }
    
    //传入KakfaSpoutConfig和KafkaConsumerFactory来构造
    KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
        this.kafkaConsumerFactory = kafkaConsumerFactory;
        this.kafkaSpoutConfig = kafkaSpoutConfig;
    }
    

    其中KafkaSpoutConfig包含了对KafkaSpout的所有配置以及对应consumer的配置。

    KafkaConsumerFactoryDefault()是一个默认的consumer factory实现,如下:

    	public class KafkaConsumerFactoryDefault<K, V> implements KafkaConsumerFactory<K, V> {
        public KafkaConsumerFactoryDefault() {
        }
    
        public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
            return new KafkaConsumer(kafkaSpoutConfig.getKafkaProps(), kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
        }
    }
    

    三、主要方法解析

    1.open()

    实现自ISpout,主要是进行一些初始化

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    	//未初始化
        this.initialized = false;
    	//设置上下文
        this.context = context;
    	//收集器
        this.collector = collector;
    	//允许未提交的Offset数量,默认0,及实时提交
        this.numUncommittedOffsets = 0L;
    	//第一次拉取消息的offset,默认UNCOMMITTED_EARLIEST,即最早未提交
        this.firstPollOffsetStrategy = this.kafkaSpoutConfig.getFirstPollOffsetStrategy();
    	//消费者的offset提交模式,默认false,即由Kafkaspout管理提交offset,若果为true则有consumer自动提交offset
        this.consumerAutoCommitMode = this.kafkaSpoutConfig.isConsumerAutoCommitMode();
    	//重试服务
        this.retryService = this.kafkaSpoutConfig.getRetryService();
    	//入股不是自动提交模式,则在定时提交,时间间隔由KafkaSpoutConfig传入
        if (!this.consumerAutoCommitMode) {
            this.commitTimer = new Timer(500L, this.kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
        }
    	//重新刷新订阅主题定时器,默认500ms,时间间隔由KafkaSpoutConfig传入
        this.refreshSubscriptionTimer = new Timer(500L, this.kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
    	//实例化
        this.offsetManagers = new HashMap();
        this.emitted = new HashSet();
        this.waitingToEmit = Collections.emptyListIterator();
        LOG.info("Kafka Spout opened with the following configuration: {}", this.kafkaSpoutConfig);
    }
    

    2.setAcked()

    确认数据已被正确处理

    当数据被bolt处理后,bolt调用ack后表明已被争取处理,在这里将TopicPartition和offset加入offsetManagers

    private void setAcked(TopicPartition tp, long fetchOffset) {
        if (!this.consumerAutoCommitMode && !this.offsetManagers.containsKey(tp)) {
            this.offsetManagers.put(tp, new OffsetManager(tp, fetchOffset));
        }
    }
    

    3.nextTuple()

    发送元组。

    大致逻辑如下:

    首先判断是否已经完成初始化,如果是,则判断是否已经提交,如果已提交,则确认提交,接着判断是否允许从Kafka拉取数据,允许的条件是:

     boolean poll = !this.waitingToEmit() && (this.numUncommittedOffsets - (long)readyMessageCount < (long)maxUncommittedOffsets || this.consumerAutoCommitMode);
    

    即没有正在等待提交的并且未提交的数量-准备好的数量<最大允许未提交的数量,如果为true,则拉取消息并加入到等待提交的列表中;然后判断是否正在等待发射,如果是,则发射元组。看下边代码会更清楚一些。

    public void nextTuple() {
        try {
    		//如果已经初始化
            if (this.initialized) {
    			//如果已经提交
                if (this.commit()) {
    				//确认提交
                    this.commitOffsetsForAckedTuples();
                }
    			//如果允许拉取
                if (this.poll()) {
                    try {
    					//拉取Kafka数据并放进waitingToEmit集合中
                        this.setWaitingToEmit(this.pollKafkaBroker());
                    } catch (RetriableException var2) {
                        LOG.error("Failed to poll from kafka.", var2);
                    }
                }
    			//如果正在等待发射,则发射
                if (this.waitingToEmit()) {
                    this.emit();
                }
            } else {
                LOG.debug("Spout not initialized. Not sending tuples until initialization completes");
            }
        } catch (InterruptException var3) {
            this.throwKafkaConsumerInterruptedException();
        }
    
    }
    

    5.poll()

    private boolean poll() {
        int maxUncommittedOffsets = this.kafkaSpoutConfig.getMaxUncommittedOffsets();
        int readyMessageCount = this.retryService.readyMessageCount();
        boolean poll = !this.waitingToEmit() && (this.numUncommittedOffsets - (long)readyMessageCount < (long)maxUncommittedOffsets || this.consumerAutoCommitMode);
        if (!poll) {
            if (this.waitingToEmit()) {
                LOG.debug("Not polling. Tuples waiting to be emitted. [{}] uncommitted offsets across all topic partitions", this.numUncommittedOffsets);
            }
    
            if (this.numUncommittedOffsets >= (long)maxUncommittedOffsets && !this.consumerAutoCommitMode) {
                LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]", this.numUncommittedOffsets, maxUncommittedOffsets);
            }
        }
    
        return poll;
    }
    

    6.commit()

    判断是改提交。

    如果 非 自动提交模式 并且 提交时间间隔已到则返回true

    private boolean commit() {
        return !this.consumerAutoCommitMode && this.commitTimer.isExpiredResetOnTrue();
    }
    

    7.waitingToEmit()

    是否正在等待发射.

    如果 waitingToEmit不为空 则返回true,表明有正在等待发射的

    private boolean waitingToEmit() {
        return this.waitingToEmit != null && this.waitingToEmit.hasNext();
    }
    

    8.setWaitingToEmit()

    设置准备发射的。

    作用就是将消费到的数据添加到一个等待提交的列表waitingToEmit中

    public void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
        List<ConsumerRecord<K, V>> waitingToEmitList = new LinkedList();
        Iterator var3 = consumerRecords.partitions().iterator();
    	//遍历所有主题分区
        while(var3.hasNext()) {
    		//得到主题分区
            TopicPartition tp = (TopicPartition)var3.next();
    		//添加对应的主题分区数据
            waitingToEmitList.addAll(consumerRecords.records(tp));
        }
    	//添加到等待发射的
        this.waitingToEmit = waitingToEmitList.iterator();
    }
    

    9.pollKafkaBroker

    拉取Kafka数据

    private ConsumerRecords<K, V> pollKafkaBroker() {
    	//先处理未成功的进行重置offset操作
        this.doSeekRetriableTopicPartitions();
    	//如果到了刷新订阅时间,则先刷新
        if (this.refreshSubscriptionTimer.isExpiredResetOnTrue()) {
            this.kafkaSpoutConfig.getSubscription().refreshAssignment();
        }
    	//拉取数据
        ConsumerRecords<K, V> consumerRecords = this.kafkaConsumer.poll(this.kafkaSpoutConfig.getPollTimeoutMs());
        int numPolledRecords = consumerRecords.count();
        LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions", numPolledRecords, this.numUncommittedOffsets);
        return consumerRecords;
    }
    

    10.doSeekRetriableTopicPartitions

    private void doSeekRetriableTopicPartitions() {
    	//先获取需要重试的集合
        Map<TopicPartition, Long> retriableTopicPartitions = this.retryService.earliestRetriableOffsets();
        Iterator var2 = retriableTopicPartitions.entrySet().iterator();
    
        while(var2.hasNext()) {
            Entry<TopicPartition, Long> retriableTopicPartitionAndOffset = (Entry)var2.next();
    		//重新定位offset
            this.kafkaConsumer.seek((TopicPartition)retriableTopicPartitionAndOffset.getKey(), ((Long)retriableTopicPartitionAndOffset.getValue()).longValue());
        }
    
    }
    

    11.emit()

    发射元组。注意:这里只是对等待发射的waitingToEmit列表进行操作。

    private void emit() {
    	//如果没有发射并且存在等待发射的,则将正在等待的从列表中移除,然后进行发射(发射操作在nextTuple()中)
        while(!this.emitTupleIfNotEmitted((ConsumerRecord)this.waitingToEmit.next()) && this.waitingToEmit.hasNext()) {
            this.waitingToEmit.remove();
        }
    }
    

    12.emitTupleIfNotEmitted()

    发射没有发射的元组

    主要逻辑如下:

    首先判断原则是否已经被确认,

    然后判断是否已经发射,

    满足以上两个条件之一时说明不需要发射。

    然后判断需不需要发射tuple:主要是判断tuple是否为空,以及配置中是否允许发射空的tuple。如果需要发射则:

    先判断重试列表中有没有当前tuple,若有,则判断是否需要重发,若需要,则继续判断是否是自动提交offset模式,若自动提交则直接发射,否则加入offsetManagers,然后判断是否在重试列表中,是则从重试列表中移除,否则将未提交offset数量加1,然后发射

    private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
        TopicPartition tp = new TopicPartition(record.topic(), record.partition());
        KafkaSpoutMessageId msgId = this.retryService.getMessageId(record);
        if (this.offsetManagers.containsKey(tp) && ((OffsetManager)this.offsetManagers.get(tp)).contains(msgId)) {
            LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
        } else if (this.emitted.contains(msgId)) {
            LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
        } else {
            List<Object> tuple = this.kafkaSpoutConfig.getTranslator().apply(record);
            if (this.isEmitTuple(tuple)) {
                boolean isScheduled = this.retryService.isScheduled(msgId);
                if (!isScheduled || this.retryService.isReady(msgId)) {
                    if (this.consumerAutoCommitMode) {
                        if (tuple instanceof KafkaTuple) {
                            this.collector.emit(((KafkaTuple)tuple).getStream(), tuple);
                        } else {
                            this.collector.emit(tuple);
                        }
                    } else {
                        this.emitted.add(msgId);
                        ((OffsetManager)this.offsetManagers.get(tp)).addToEmitMsgs(msgId.offset());
                        if (isScheduled) {
                            this.retryService.remove(msgId);
                        } else {
                            ++this.numUncommittedOffsets;
                        }
    
                        if (tuple instanceof KafkaTuple) {
                            this.collector.emit(((KafkaTuple)tuple).getStream(), tuple, msgId);
                        } else {
                            this.collector.emit(tuple, msgId);
                        }
                    }
    
                    LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", new Object[]{tuple, record, msgId});
                    return true;
                }
            } else {
                LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record);
                msgId.setEmitted(false);
                this.ack(msgId);
            }
        }
    
        return false;
    }
    

    12.isEmitTuple()

    是否发送元组。先判空,然后在判断config中是否允许发射空原则,满足一个即可

    private boolean isEmitTuple(List<Object> tuple) {
        return tuple != null || this.kafkaSpoutConfig.isEmitNullTuples();
    }
    

    13.commitOffsetsForAckedTuples()

    提交已经确认的Tuple的偏移量

    private void commitOffsetsForAckedTuples() {
    	//用来保存将要提交的数据
        Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap();
    	//从offsetManagers获取数据
        Iterator var2 = this.offsetManagers.entrySet().iterator();
    
        Entry tpOffset;
        while(var2.hasNext()) {
            tpOffset = (Entry)var2.next();
            OffsetAndMetadata nextCommitOffset = ((OffsetManager)tpOffset.getValue()).findNextCommitOffset();
            if (nextCommitOffset != null) {
                nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
            }
        }
    
        if (!nextCommitOffsets.isEmpty()) {
    		//进行提交操作
            this.kafkaConsumer.commitSync(nextCommitOffsets);
            LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets);
            var2 = nextCommitOffsets.entrySet().iterator();
    		//日志输出未提交的
            while(var2.hasNext()) {
                tpOffset = (Entry)var2.next();
                TopicPartition tp = (TopicPartition)tpOffset.getKey();
                OffsetManager offsetManager = (OffsetManager)this.offsetManagers.get(tp);
                long numCommittedOffsets = offsetManager.commit((OffsetAndMetadata)tpOffset.getValue());
                this.numUncommittedOffsets -= numCommittedOffsets;
                LOG.debug("[{}] uncommitted offsets across all topic partitions", this.numUncommittedOffsets);
            }
        } else {
            LOG.trace("No offsets to commit. {}", this);
        }
    
    }
    

    14.ack()

    public void ack(Object messageId) {
    	//转化为KafkaMessageId
        KafkaSpoutMessageId msgId = (KafkaSpoutMessageId)messageId;
    	//如果已经提交的不包含msgId
        if (!this.emitted.contains(msgId)) {
    		//如果已经提交
            if (msgId.isEmitted()) {
                LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that came from a topic-partition that this consumer group instance is no longer tracking due to rebalance/partition reassignment. No action taken.", msgId);
            } else {
                LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId);
            }
        } else {//如果不已提交列表不包含msgId
    		//如果非自动提交模式
            if (!this.consumerAutoCommitMode) {
    			//添加到确认列表
                ((OffsetManager)this.offsetManagers.get(msgId.getTopicPartition())).addToAckMsgs(msgId);
            }
    		//移除
            this.emitted.remove(msgId);
        }
    
    }
    

    15.fail()

    失败处理。

    主要操作就是增加失败次数,然后调用重试服务进行重试

    public void fail(Object messageId) {
        KafkaSpoutMessageId msgId = (KafkaSpoutMessageId)messageId;
        if (!this.emitted.contains(msgId)) {
            LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
        } else {
            msgId.incrementNumFails();
            if (!this.retryService.schedule(msgId)) {
                LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
                this.ack(msgId);
            } else {
                this.emitted.remove(msgId);
            }
    
        }
    }
    
  • 相关阅读:
    Ruby笔记四(数组)
    中央直属企业名单【中国级别最高的169家企业】(转)找工作按这个来
    循环pthread_create导致虚拟内存上涨(续1)
    除掉行数小程序
    client comserver编译配置运行详细说明
    网络监听技术概览(转待看)
    查看 linux系统版本,内核,CPU,MEM,位数的相关命令(实验)
    项目中Shell脚本说明(待完善)
    多线程 or 多进程 (实验1)
    循环pthread_create导致虚拟内存上涨(续2)
  • 原文地址:https://www.cnblogs.com/cnsec/p/13286645.html
Copyright © 2011-2022 走看看