zoukankan      html  css  js  c++  java
  • storm之spout

    一、什么是spout

    spout:喷嘴、喷口。即数据从这里发出。

    spout是storm的数据来源,而spout的数据来源又是从其他地方,比如数据库或者消息中间件中流入的。

    以Kafka为例,spout先从kafka中拉取数据,然后封装为一个tuple,发给下游的bolt进行处理。对于Kafka来说,spout是消费者;对于bolt来说spout是生产者。

    为什么要用spout去拉取消息,而不是直接由bolt接收推送的数据呢,这中拉模式有什么好处呢?

    如果,将数据直接推送给bolt,当数据量突然增加的时候,可能导致某一个bolt瘫痪,继而影响整个topology运行;而当没有数据的时候,整个topolog又处于空闲状态,浪费资源。而由spout去拉取消息则不会出现这样的问题。

    二、KafkaSpout

    KafkaSpout实现了从Kafka拉取数据为storm提供数据源。并且重新实现了ack机制。一般的我们通过简单的配置就可以使用了。
    
    	//kafkaSpout配置
        private KafkaSpoutConfig<String, String> kafkaSpoutConfig() {
            final Fields outputFields = new Fields("topic", "partition", "offset", "timestamp", "key", "msg_from_kafka");
            KafkaSpoutConfig<String, String> config;
            //consumer的配置
            Properties props = new Properties();
            //默认由kafkaSpout进行ack后才提交(false),如果自动提交,则kafkaspout的ack失效,可能丢失或重复数据
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    
            KafkaSpoutRetryService kafkaSpoutRetryService = new KafkaSpoutRetryExponentialBackoff(
                    TimeInterval.microSeconds(500),
                    TimeInterval.milliSeconds(2),
                    1,
                    TimeInterval.seconds(10));
    
            config = KafkaSpoutConfig
                    .builder("ip:9092", "topic_test")
                    //首次消费消息的offset
                    .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
                    //最后一个参数为输出字段
                    .setRecordTranslator((r) -> new Values(r.topic(), r.partition(), r.offset(), r.timestamp(), r.key(), r.value()), outputFields)
                    //offset自动提交时间间隔,如果设置了enable.auto.commit=true则无效
                    .setOffsetCommitPeriodMs(1_000)//1秒
                    //达到这个值后向提交offset
                    .setMaxUncommittedOffsets(1_000_000)//10万
                    //group
                    .setGroupId("test-w")
                    //kafka consumer配置
                    .setProp(props)
                    .setRetry(kafkaSpoutRetryService)
                    .build();
            return config;
        }
    
    	//拓扑结构
        private StormTopology stormTopology() {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("spout", new ProducerSpout(kafkaSpoutConfig()), 1);
            builder.setBolt("bolt1", new BoltTest(), 1).shuffleGrouping("spout");
            return builder.createTopology();
        }
    

    kafkaspout的所有配置项:

    public static final long DEFAULT_POLL_TIMEOUT_MS = 200L;
    public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30000L;
    public static final int DEFAULT_MAX_RETRIES = 2147483647;
    public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10000000;
    public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2000L;
    public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0L), TimeInterval.milliSeconds(2L), 2147483647, TimeInterval.seconds(10L));
    public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0L), TimeInterval.milliSeconds(0L), 2147483647, TimeInterval.milliSeconds(0L));
    private final Map<String, Object> kafkaProps;
    private final Subscription subscription;
    private final SerializableDeserializer<K> keyDes;
    private final Class<? extends Deserializer<K>> keyDesClazz;
    private final SerializableDeserializer<V> valueDes;
    private final Class<? extends Deserializer<V>> valueDesClazz;
    private final long pollTimeoutMs;
    private final RecordTranslator<K, V> translator;
    private final long offsetCommitPeriodMs;
    private final int maxUncommittedOffsets;
    private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
    private final KafkaSpoutRetryService retryService;
    private final long partitionRefreshPeriodMs;
    private final boolean emitNullTuples;
    

    具体含义在后面会总结。


    参考资料:

    《storm技术内幕与大数据实战》

  • 相关阅读:
    Win RT Webview获取cookie
    c#代码片段新建(sinppet)
    wp8.1启动协议
    移动开源框架
    Web开发工具箱
    比较2个字符串相似度
    js的继承
    mvc4开篇之BundleConfig(1)
    职业规划历程
    Redis Cluster管理
  • 原文地址:https://www.cnblogs.com/cnsec/p/13286646.html
Copyright © 2011-2022 走看看