zoukankan      html  css  js  c++  java
  • kafka+storm 单机运行

    环境:

      1、kafka+zookeeper

      2、window平台

      3、eclipse

    设置:

    1、kafka和zookeeper安装,另一篇有介绍(https://www.cnblogs.com/51python/p/10870258.html

    2、eclipse代码(建立maven工程)

      pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>hadoop</groupId>
      <artifactId>eclipseandmaven</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>
    
      <name>eclipseandmaven</name>
      <url>http://maven.apache.org</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>
      <dependencies>
     
        <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-kafka-client</artifactId>
                <version>1.1.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.10.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <version>1.1.1</version>
                 <!--  本地测试注释集群运行打开 -->
                <!--  <scope>provided</scope>-->
            </dependency>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
        </dependency>
      </dependencies>
    </project>
    View Code

      主函数

    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.kafka.spout.KafkaSpout;
    import org.apache.storm.kafka.spout.KafkaSpoutConfig;
    import org.apache.storm.topology.TopologyBuilder;
    
    public class MainTopology {
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
            KafkaSpoutConfig.Builder<String, String> kafkaBuilder = KafkaSpoutConfig.builder("127.0.0.1:9092", "test0811");
    //                .builder("127.0.0.1:9092,node-2:9092,node-3:9092", "test0811");
            // 设置kafka属于哪个组
            kafkaBuilder.setGroupId("testgroup");
            // 创建kafkaspoutConfig
            KafkaSpoutConfig<String, String> build = kafkaBuilder.build();
            // 通过kafkaspoutConfig获得kafkaspout
            KafkaSpout<String, String> kafkaSpout = new KafkaSpout<String, String>(build);
            // 设置5个线程接收数据
            builder.setSpout("kafkaSpout", kafkaSpout, 5);
            // 设置2个线程处理数据
            builder.setBolt("printBolt", new PrintBolt(), 2).localOrShuffleGrouping("kafkaSpout");
            Config config = new Config();
            if (args.length > 0) {
                // 集群提交模式
                config.setDebug(false);
                StormSubmitter.submitTopology(args[0], config, builder.createTopology());
            } else {
                // 本地测试模式
                config.setDebug(true);
                // 设置2个进程
                config.setNumWorkers(2);
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("kafkaSpout", config, builder.createTopology());
            }
        }
    }
    View Code

     storm输出

    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseBasicBolt;
    import org.apache.storm.tuple.Tuple;
    
    public class PrintBolt extends BaseBasicBolt {
        /**
         * execute会被storm一直调用
         * 
         * @param tuple
         * @param basicOutputCollector
         */
        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            // 为了便于查看消息用err标红
            System.err.println(tuple.getValue(4));
            System.err.println(tuple.getValues());
        }
    
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    
        }
    }
    View Code

    3、运行

      1)启动zookeeper

    zkserver

      2)启动kafka服务(在D:igdatakafka_2.11-0.9.0.1安装目录打开cmd)

    .inwindowskafka-server-start.bat .configserver.properties  

      3)创建主题(在D:igdatakafka_2.11-0.9.0.1inwindows安装目录打开cmd)

    kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test0811

      4)创建生产者(在D:igdatakafka_2.11-0.9.0.1inwindows安装目录打开cmd)

    kafka-console-producer.bat --broker-list localhost:9092 --topic test0811 

      5)启动主函数

      运行eclipse主函数

    结果:

      在4中的cmd窗口输入字符串,会在eclipse中收到。

    这是单机版,后面会做多机通信,敬请期待!

    参考:https://blog.csdn.net/qq_41455420/article/details/79385566

  • 相关阅读:
    王一恒《跨部门沟通与协作》讲座学习笔记(图文)
    DotNetBar.Bar菜单的使用
    android app 架构设计01
    crm高速开发之OrganizationService
    《Java程序设计》第16周周五:数据库连接 与 随机数的使用
    hdu 1058 Humble Numbers
    iOS开发之获取沙盒路径
    [Swift通天遁地]七、数据与安全-(2)对XML和HTML文档的快速解析
    [Swift]LeetCode394. 字符串解码 | Decode String
    [Swift]LeetCode393. UTF-8 编码验证 | UTF-8 Validation
  • 原文地址:https://www.cnblogs.com/51python/p/10908660.html
Copyright © 2011-2022 走看看