zoukankan      html  css  js  c++  java
  • storm整合kafka storm-kafka-client

    pom.xml-注意jar-log4j
    ---------------------
    <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>1.1.3</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.3</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.44</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.0.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies>

    TopologyBuilder builder = new TopologyBuilder();
    ByTopicRecordTranslator<String,String> brt = new ByTopicRecordTranslator<>( (r) -> new Values(r.value(),r.topic()),new Fields("values","msg"));
    KafkaSpoutConfig<String,String> ksc = KafkaSpoutConfig //bootstrapServers 以及topic(mycall_in)
    .builder("192.168.1.3:9092", "mycall_in") //设置group.id .setProp(ConsumerConfig.GROUP_ID_CONFIG, "skc-test") //设置开始消费的气势位置 .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST) //设置提交消费边界的时长间隔 .setOffsetCommitPeriodMs(10_000) //Translator .setRecordTranslator(brt) .build();
    builder.setSpout("kafkaspout", new KafkaSpout<>(ksc), 4); builder.setBolt("mybolt1", new MyBolt1(), 2).shuffleGrouping("kafkaspout");

    //set producer properties. Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.3:9092"); props.put("acks", "1"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); @SuppressWarnings({ "unchecked", "rawtypes" }) KafkaBolt bolt = new KafkaBolt() .withProducerProperties(props) .withTopicSelector(new DefaultTopicSelector("mycall_out")) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("", "call")); builder.setBolt("KafkaBolt", bolt, 4).fieldsGrouping("mybolt3", new Fields("call")); Config config = new Config(); config.setNumWorkers(2); config.setNumAckers(0); config.setDebug(false); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("storm-kafka-clients", config, builder.createTopology()); try { //集群运行 //StormSubmitter.submitTopology("storm-kafka-clients", config, builder.createTopology()); Thread.sleep(1000*60*30);//30m cluster.shutdown(); } catch (Exception e) { e.printStackTrace(); }


    ---
    感谢阅读,需完整代码的请联系博主!











    <dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka-client</artifactId><version>1.1.3</version></dependency>

  • 相关阅读:
    powerful number 小记
    CF573E Bear and Bowling
    Diary 2.0
    【LOJ2540】「PKUWC2018」随机算法
    【Luogu2496】【BZOJ3005】[SDOI2012]体育课
    CF-diary
    【CF1217F】Forced Online Queries Problem
    NOI2019 选做
    Codeforces Round #568 (Div. 2) 选做
    【LOJ2513】「BJOI2018」治疗之雨
  • 原文地址:https://www.cnblogs.com/syscn/p/9921087.html
Copyright © 2011-2022 走看看