zoukankan      html  css  js  c++  java
  • 5、Storm集成Kafka

    1、pom文件依赖

    <!--storm相关jar  -->
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>${storm.version}</version>
        <!--排除相关依赖  -->
        <exclusions>
            <exclusion>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-slf4j-impl</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-1.2-api</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-web</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
                <artifactId>ring-cors</artifactId>
                <groupId>ring-cors</groupId>
            </exclusion>
        </exclusions>
        <!--<scope>provided</scope>--><!--注意本地调试和集群部署-->
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-kafka-client</artifactId>
        <version>1.2.2</version>
        <!--<scope>provided</scope>--><!--注意本地调试和集群部署-->
    </dependency>
    
    <!--注:老版本使用的storm-kafka依赖已经被废弃,建议在以后使用storm-kafka-client依赖进行开发,老版本的storm-kafka依赖为:-->
    <!--    <dependency> -->
    <!--        <groupId>org.apache.storm</groupId> -->
    <!--        <artifactId>storm-kafka</artifactId> -->
    <!--        <version>1.2.2</version> -->
    <!--    </dependency> -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.1.0</version>
    </dependency>
    

    2、Topology

    @Component
    public class KafkaStormSpoutWordCountTopology {
    
        public static void main(String[] args) {
    
            KafkaSpoutConfig.Builder<String,String> builder =
                    KafkaSpoutConfig.builder(
                            "192.168.8.101:9092,192.168.8.102:9092,192.168.8.103:9092",
                            "yun01");
    
            builder.setGroupId("test_storm_wc");
    
            KafkaSpoutConfig<String, String> kafkaSpoutConfig= builder.build();
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            
            topologyBuilder.setSpout("WordCountKafkaSpout",
                                    new KafkaSpout<String,String>(kafkaSpoutConfig),
                                    1);
    
            topologyBuilder.setBolt("ReadKafkaSpoutBolt",
                                    new ReadKafkaSpoutBolt()).shuffleGrouping("WordCountKafkaSpout");
    
            Config config = new Config();
    
    
            System.out.println("准备启动kafkaStromTopo");
            LocalCluster cluster= new LocalCluster();
            cluster.submitTopology("kafkaStromTopo", config, topologyBuilder.createTopology());
    
    
    
    //        //启动topology的配置信息
    //        Config conf = new Config();
    //        //TOPOLOGY_DEBUG(setDebug),当他被设置成true的话,storm会记录下每个组件所发射的每条消息
    //        //这在本地环境调试topology很有用。但是在线上这么做的话,会影响性能
    //        conf.setDebug(false);
    //
    //        //storm的运行模式有两种:本地模式和分布式模式
    //        if(args != null || args.length>0){
    //            conf.setNumWorkers(3);
    //            //向集群提交topology
    //            try {
    //                StormSubmitter.submitTopologyWithProgressBar(args[0],conf,topologyBuilder.createTopology());
    //            } catch (AlreadyAliveException e) {
    //                e.printStackTrace();
    //            } catch (InvalidTopologyException e) {
    //                e.printStackTrace();
    //            } catch (AuthorizationException e) {
    //                e.printStackTrace();
    //            }
    //        }
    //        else{
    //
    //
    //            conf.setMaxTaskParallelism(3);
    //
    //            LocalCluster cluster = new LocalCluster();
    //            cluster.submitTopology("word-count",conf,builder.createTopology());
    //        }
        }
    
    

    3、Bolt, 设计拓扑请跟根据自己的业务

    public class ReadKafkaSpoutBolt extends BaseBasicBolt {
        @Override
        public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
    
            System.out.println(input.getValues().get(4)+"消息接受bolt");
            /*
            input 获取到的值
    
            0索引代表kafka的topic
            1索引代表kafka的分区
            2索引代表kafka的偏移量
            3索引代表kafka的key值
            4索引代表kafka的value值
            */
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    
        }
    }
    
  • 相关阅读:
    洛谷-P5743 【深基7.习8】猴子吃桃
    洛谷-P5741 【深基7.例10】旗鼓相当的对手
    洛谷-P5740 【深基7.例9】最厉害的学生
    洛谷-P5739 【深基7.例7】计算阶乘
    jvm中常见的指令笔记
    join()方法的源码分析
    Java中线程状态的各种转换关系
    java构造器遇到父类没有无参构造的分析
    jvm栈和堆
    spring新注解
  • 原文地址:https://www.cnblogs.com/xidianzxm/p/10774655.html
Copyright © 2011-2022 走看看