zoukankan      html  css  js  c++  java
  • storm整合kafka storm-kafka-client

    pom.xml-注意jar-log4j
    ---------------------
    <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>1.1.3</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.3</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.44</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.0.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies>

    TopologyBuilder builder = new TopologyBuilder();
    ByTopicRecordTranslator<String,String> brt = new ByTopicRecordTranslator<>( (r) -> new Values(r.value(),r.topic()),new Fields("values","msg"));
    KafkaSpoutConfig<String,String> ksc = KafkaSpoutConfig //bootstrapServers 以及topic(mycall_in)
    .builder("192.168.1.3:9092", "mycall_in") //设置group.id .setProp(ConsumerConfig.GROUP_ID_CONFIG, "skc-test") //设置开始消费的气势位置 .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST) //设置提交消费边界的时长间隔 .setOffsetCommitPeriodMs(10_000) //Translator .setRecordTranslator(brt) .build();
    builder.setSpout("kafkaspout", new KafkaSpout<>(ksc), 4); builder.setBolt("mybolt1", new MyBolt1(), 2).shuffleGrouping("kafkaspout");

    //set producer properties. Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.3:9092"); props.put("acks", "1"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); @SuppressWarnings({ "unchecked", "rawtypes" }) KafkaBolt bolt = new KafkaBolt() .withProducerProperties(props) .withTopicSelector(new DefaultTopicSelector("mycall_out")) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("", "call")); builder.setBolt("KafkaBolt", bolt, 4).fieldsGrouping("mybolt3", new Fields("call")); Config config = new Config(); config.setNumWorkers(2); config.setNumAckers(0); config.setDebug(false); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("storm-kafka-clients", config, builder.createTopology()); try { //集群运行 //StormSubmitter.submitTopology("storm-kafka-clients", config, builder.createTopology()); Thread.sleep(1000*60*30);//30m cluster.shutdown(); } catch (Exception e) { e.printStackTrace(); }


    ---
    感谢阅读,需完整代码的请联系博主!











    <dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka-client</artifactId><version>1.1.3</version></dependency>

  • 相关阅读:
    监听本机tcp和udp的端口
    sysstat-----获取服务器负载历史记录
    inode索引详解
    tcpdump详解
    Windws Server 2008 R2 WEB环境配置之IIS7/IIS7.5+FastCGI+PHP 5.6.4+MYSQL+phpMyAdmin
    echo 命令
    带宽、流量、下载速度之间的换算
    windows 下解决 Time_Wait 和 CLOSE_WAIT 方法
    LNMP环境部署
    关于旅行
  • 原文地址:https://www.cnblogs.com/syscn/p/9921087.html
Copyright © 2011-2022 走看看