zoukankan      html  css  js  c++  java
  • Storm Kafka Integration (0.10.x+)

    以下内容涉及版本:Storm 1.1.0,Kafka 2.10_0.10.2

    参照官方文档:http://storm.apache.org/releases/1.1.0/storm-kafka-client.html

    Storm Apache Kafka 集成使用了kafka-client jar

    kafka-client jar 包含了新版Kafka Consumer API

    兼容性

    Apache Kafka 0.10+

    在Topology中向Kafka写入数据

    你可以创建org.apache.storm.kafka.bolt.KafkaBolt的实例,并且把它作为一个Component添加到你的Topology中,

    或者如果你正在使用Trident,可以使用org.apache.storm.kafka.trident.TridentState, org.apache.storm.kafka.trident.TridentStateFactory和org.apache.storm.kafka.trident.TridentKafkaUpdater。

    你需要提供以下两种接口的实现。

    TupleToKafkaMapper 和 TridentTupleToKafkaMapper

    这两个接口定义有两个方法:

        K getKeyFromTuple(Tuple/TridentTuple tuple);
        V getMessageFromTuple(Tuple/TridentTuple tuple);

    就像命名建议那样,这些方法用来把一个tuple映射成Kafka key和message。

    如果想一个field作为Key,一个field作为Value,那可以使用它们的实现类FieldNameBasedTupleToKafkaMapper(bolt和trident的包中都有这个名字的类)。

    在KafkaBolt中,如果使用默认构造器构造FieldNameBasedTupleToKafkaMapper,则可以把一个带有field名为key和message的tuple映射为输出到Kafka Broker的key和message。

    或者使用非默认构造器指定不同的filed为输出到Kafka的key和message。

    在TridentKafkaState中,你必须指定哪个Field名是Key和Message,因为它没有默认构造器。这些必须在构造FieldNameBasedTupleToKafkaMapper时指定。

    KafkaTopicSelector 和 trident KafkaTopicSelector

    这个接口只提供了一个方法。

    public interface KafkaTopicSelector {
     String getTopics(Tuple/TridentTuple tuple);
    }

    这个接口的实现要返回tuple映射成的key/message要发布到的Topic,返回null的话,当前message就会被忽略。

    如果有一个固定的topic名,则可以使用DefaultTopicSelector,在构造器里把topic的名字设置上。

    FieldNameTopicSelector和FieldIndexTopicSelector用来选择tuple要输出到的Topic,需要在tuple中指定field名和field索引。

    当Topic名没有找到,则Field*TopicSelector会把message写到默认的Topic,这种情况下要确定默认的Topic已经创建了。

    指定Kafka Producer的属性

    在Storm topology中通过调用KafkaBolt.withProducerProperties()提供全部的Kafka Producer属性,可以参照

    http://kafka.apache.org/documentation.html#newproducerconfigs Section "Important configuration properties for the producer"

    获取更多细节。这些都定义在org.apache.kafka.clients.producer.ProducerConfig中

    使用通配符匹配Kafka Topic

    通过添加以下配置可以使用通配符匹配Topic

    Config config = new Config();

    config.put("kafka.topic.wildcard.match",true);

    代码示例:

    /**After this you can specify a wildcard topic for matching
      *e.g.  clickstream.*.log.  
      *This will match all streams matching clickstream.my.log, clickstream.cart.log etc
      */
    
    //Putting it all together
    
    //For the bolt :
    //java
            TopologyBuilder builder = new TopologyBuilder();
    
            Fields fields = new Fields("key", "message");
            FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
                        new Values("storm", "1"),
                        new Values("trident", "1"),
                        new Values("needs", "1"),
                        new Values("javadoc", "1")
            );
            spout.setCycle(true);
            builder.setSpout("spout", spout, 5);
            //set producer properties.
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("acks", "1");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            KafkaBolt bolt = new KafkaBolt()
                    .withProducerProperties(props)
                    .withTopicSelector(new DefaultTopicSelector("test"))
                    .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
            builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
    
            Config conf = new Config();
    
            StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
    //For Trident:
    
            Fields fields = new Fields("word", "count");
            FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
                    new Values("storm", "1"),
                    new Values("trident", "1"),
                    new Values("needs", "1"),
                    new Values("javadoc", "1")
            );
            spout.setCycle(true);
    
            TridentTopology topology = new TridentTopology();
            Stream stream = topology.newStream("spout1", spout);
    
            //set producer properties.
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("acks", "1");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
                    .withProducerProperties(props)
                    .withKafkaTopicSelector(new DefaultTopicSelector("test"))
                    .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
            stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
    
            Config conf = new Config();
            StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());

    从Kafka读取Message(Spouts)

    配置

    Spout的实现使用org.apache.storm.kafka.spout.KafkaSpoutConfig进行配置。

    这个类采用Builder模式,可以通过调用其中一个Buolder的构造器或调用KafkaSpoutConfig的静态builder方法创建Builder对象

    构造器和静态builder方法都需要一些关键的配置属性值(最小化配置)才能创建Builder对象

    bootstrapServers:Kafka Consumer 属性"bootstrap.servers"

    topics:Spout要消费的Topic,可以使一个包含一个或多个Topic名的Collection,或者是一个正则表达式的Pattern,匹配的topic都将被消费

    使用Builder构造器的话,还需要提供 key和value的反序列化器(deserializer),这样可以通过使用Java泛型保障类型安全。

    默认的反序列化器是StringDeserializer,这个设置可以通过KafkaSpoutConfig.Builder对象的setKey和setValue方法重新设置。

    如果反序列化设置为null,topology会fall back到Kafka设置的相关属性,但最好在topology中显示声明以保障类型安全。

    还有一些需要关注的配置:

    setFirstPollOffsetStrategy 设置从哪里开始消费数据,这个配置可以用在故障恢复和首次启动这个spout时。合法的值有:

    • EARLIEST Kafka Spout从当前Partition的第一个Offset开始消费records,无视之前的提交
    • LATEST    Kafka Spout从当前Partition中最新的Offset(大于分区中最后一个Offset的位置)开始消费records,无视之前的提交
    • UNCOMMITTED_EARLIEST (DEFAULT) Kafka Spout 从上一次 committed offset的位置消费records,如果之前没有Offset被提交,则像EARLIEST一样消费数据
    • UNCOMMITTED_LATEST Kafka Spout 从上一次 committed offset的位置消费records,如果之前没有Offset被提交,则像LATEST一样消费数据

    setRecordTranslator用来指定Spout如何将一个Kafka Consumer Record转换为Tuple,并且指定tuple会输出到哪个stream。

    默认情况下,"topic", "partition", "offset", "key", and "value" 会被发送到默认的Stream中。

    如果要将tuple基于Topic输出到不同的Stream中去,使用org.apache.storm.kafka.spout.ByTopicRecordTranslator<K,V>

    setProp 可以用来设置没有提供设置method的其他属性

    setGroupId 用来设置Kafka Consumer Group的 group.id

    setSSLKeystore和setSSLTruststore 配置SSL身份验证

    KafkaSpout如何实现ack机制(这一段落内容是我自己添加的)

    KafkaSpout实现了ack和fail方法,只要在自定义的bolt中,锚定旧tuple,调用OutputCollector的ack(tuple)方法就可以实现tuple ack了。

    kafkaspot在消费kafka的数据时,通过offset读取到消息并发送给bolt后,KafkaSpout只是保存着Partition当前的offset值。
    当失败或成功根据msgId所在的Partition查询offset值,然后再去kafka消费该数据来确保消息的重新发送。
    那么虽然offset数据小,但是当offset的数据量上去了还是会内存溢出的?
    其实并没有,kafkaspout发现缓存的数据超过限制了,会把某端的数据清理掉的。

    Usage Examples

    创建一个简单的没有安全认证的Spout

    //The following will consume all events published to "topic" and
    // send them to MyBolt with the fields "topic", "partition", "offset", "key", "value".
    final TopologyBuilder tp = new TopologyBuilder(); tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, "topic").build()), 1); tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout"); ... /*Wildcard Topics Wildcard topics will consume from all topics that exist in the specified brokers list and match the pattern.
    So in the following example "topic", "topic_foo" and "topic_bar" will all match the pattern "topic.*", but "not_my_topic" would not match.
    */ final TopologyBuilder tp = new TopologyBuilder(); tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, Pattern.compile("topic.*")).build()), 1); tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout"); ...

    Multiple Streams

    final TopologyBuilder tp = new TopologyBuilder();
    //By default all topics not covered by another rule, but consumed by the spout will be emitted to "STREAM_1" as "topic", "key", and "value"
    ByTopicRecordTranslator byTopic = new ByTopicRecordTranslator<>( 
    (r) -> new Values(r.topic(), r.key(), r.value()),
    new Fields("topic", "key", "value"),
    "STREAM_1"); //For topic_2 all events will be emitted to "STREAM_2" as just "key" and "value" byTopic.forTopic("topic_2", (r) -> new Values(r.key(), r.value()), new Fields("key", "value"), "STREAM_2"); tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder(
    "127.0.0.1:" + port, "topic_1", "topic_2", "topic_3").build())
    , 1); tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout", "STREAM_1"); tp.setBolt("another", new myOtherBolt()).shuffleGrouping("kafka_spout", "STREAM_2"); ...
    // Trident
    
    //java
    final TridentTopology tridentTopology = new TridentTopology();
    final Stream spoutStream = tridentTopology.newStream("kafkaSpout",
        new KafkaTridentSpoutOpaque<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, Pattern.compile("topic.*")).build()))
          .parallelismHint(1)
    ...

    Trident不支持多路流,它会忽略要输出的任意的流集合。但是如果对于每一个输出topic,Field是不相同的话,它会抛出异常并终止运行。

    Custom RecordTranslators(ADVANCED)

    大部分情况下,SimpleRecordTranslator和ByTopicRecordTranslator可以cover你的使用场景了。如果你需要自定义一个RecordTranslator,参照本节。

    自定义RecordTranslator的关键是 取得一个 ConsumerRecord,把它转化成一个可以发送出去的 List<Object>。

    不明显的是如何告诉Spout发送这个List到指定的Stream,要实现这个需要返回org.apache.storm.kafka.spout.KafkaTuple的一个实例,其中的routedTo方法会明确tuple要到哪个Stream。

    例如:java return new KafkaTuple(1, 2, 3, 4).routedTo("bar"); //就是把这个KafkaTuple发送到 "bar"这个Stream中。

    当编写自定义的record Translator时,它是要自我一致的。stream方法 要返回一个完整的将由translator尝试发送出的streams集合。

    getFieldsFor方法要为每一个stream返回一个合法的 Fields对象。

    如果对Trident这么用的话,一个值必须在 属于那个Stream的每一个在Field对象 所组成而返回的List里,否则Trident会抛出异常。

    Manual Partition Control(ADVANCED)

    默认情况下,Kafka会自动为Spouts集合分配分区。它处理了很多事情。手动安排分区的话,当Spout宕掉和恢复时会引发分配的问题,如果处理不当会造成很多问题。

    这要通过继承org.apache.storm.kafka.spout.Subscription去解决,Storm提供了ManualPartitionNamedSubscription和ManualPartitionPatternSubscription的例子可以参照。

    使用它们或自己实现手动分区控制,一定要小心。

    使用Maven Shade Plugin来创建Uber Jar

    添加以下内容到 REPO_HOME/storm/external/storm-kafka-client/pom.xml 中

    <plugin>
      <groupId>org.apache.maven.plugins</groupId> 
      <artifactId>maven-shade-plugin</artifactId> 
      <version>2.4.1</version>
      <executions>
        <execution>
          <phase>package</phase>
          <goals>
            <goal>shade</goal>
          </goals>
          <configuration>
            <transformers>
              <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> 
                <mainClass>org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain</mainClass>
    </transformer>
    </transformers>
    </configuration>
    </execution>
    </executions>
    </plugin>

    运行下面的命令创建 uber jar

    mvn package -f REPO_HOME/storm/external/storm-kafka-client/pom.xml

    符合以下命名方式和位置的uber jar就被创建了

    REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-1.0.x.jar

    运行Storm Topology

    复制REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-*.jarSTORM_HOME/extlib

    使用Kafka命令行工具创建三个Topic [test, test1, test2],使用Kafka console producer向这三个Topic中写入一些数据。

    执行命令

    STORM_HOME/bin/storm jar REPO_HOME/storm/external/storm/target/storm-kafka-client-*.jar org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain

    调整日志级别为debug,就可能看到每个topic的message被定向到适合的Bolt,这些Bolt是在流中定义并由shuffle grouping选择的。

    使用带有不同版本Kafka的 storm-kafka-client

    maven的pom.xml中使用storm-kafka-client来添加storm与0.10+Kafka集成的依赖包,对于其他版本的Kafka,要使用storm-kafka。

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-kafka-client</artifactId>
        <version>${storm.version}</version>
    </dependency>

    Kafka Spout性能的调整

    Kafka Spout提供两个内部参数控制性能。这几个参数可以使用KafkaSpoutConfig的setOffsetCommitPeriodMs和setMaxUncommittedOffsets调整。

    • offset.commit.period.ms 控制Spout向Kafka提交的周期
    • max.uncommitted.offsets 控制在另一个poll开始前有多少Offset的提交可以被挂起

    Kafka Consumer的配置参数也会影响Spout的性能,以下几个Kafka参数最为影响Spout性能:

    • fetch.min.bytes
    • fetch.max.bytes
    • Kafka Consumer实例的poll timeout,由每个Kafka Spout使用KafkaSpoutConfig的setPollTimeoutMs设置

    这些参数要依照Kafka集群的结构、数据的分布、可获得数据的可用性,进行合理的设置。要参照 Kafka文档。

    默认值:

    当前Kafka spout拥有以下默认值,这些默认值在这篇blog里描述的测试环境中显示了很好的性能。

    • poll.timeout.ms = 200
    • offset.commit.period.ms = 30000 (30s)
    • max.uncommitted.offsets = 10000000 

    Kafka自动提交模式

    如果可靠性对你不重要的话(不关心失败时丢失tuple),去除tuple跟踪造成的开销,可以使KafkaSpout运行在AutoCommitMode模式下。

    使其生效:

    1. set Config.TOPOLOGY_ACKERS 为0

    2. 使AutoCommitMode在Kafka Consumer的配置里生效

    在KafkaSpout中设置AutoCommitMode的例子:

    KafkaSpoutConfig<String, String> kafkaConf =
    KafkaSpoutConfig.builder(String bootstrapServers, String ... topics)
    .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
    .build();

    注意,Storm中没有正确的 最多一次 的语义,当Offset被Kafka Consumer周期性地提交后,当KafkaSpout挂掉时一些Tuple可能会被重发。

  • 相关阅读:
    简单的几句接口调用,完美完成缩短网站
    nodejs语言实现验证码生成功能
    人工智能:文本相似度分析
    SolrCloud 5.2.1 installation and configuration
    借鉴DP思想: HouseRobberIII
    有意思的数学题:Trapping Rain Water
    API认证方法一览
    FE: Responsive Web Design Overview
    FE: Sass and Bootstrap 3 with Sass
    假期小记:14年寒假二三四事(儿)
  • 原文地址:https://www.cnblogs.com/sunspeedzy/p/7447013.html
Copyright © 2011-2022 走看看