zoukankan      html  css  js  c++  java
  • Kafka+Storm+HDFS 整合示例

      消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,然后再由实时计算程序Storm做实时分析,最后将结果保存在HDFS中,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理。下面开发一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,最后将结果保存至HDFS。

    1. kafka程序

    package com.dxss.storm;
    
    import kafka.producer.KeyedMessage;
    import kafka.javaapi.producer.Producer;
    import kafka.producer.ProducerConfig;
    import java.util.Properties;
    
    /**
     * Created by hadoop on 2017/7/29.
     */
    public class KakfaProceduer {
        public static void main(String[] args) throws InterruptedException{
            Properties properties = new Properties();
            properties.put("zookeeper.connect","12.13.41.41:2182");
            properties.put("metadata.broker.list","12.13.41.41:9092");
            properties.put("serializer.class","kafka.serializer.StringEncoder");
            ProducerConfig producerConfig = new ProducerConfig(properties);
            Producer producer = new Producer<String, String>(producerConfig);
            KeyedMessage<String,String> keyedMessage = new KeyedMessage<String, String>("sunfei","I am a chinese");
            while(true){
                producer.send(keyedMessage);
            }
        }
    }

    2. Storm--spout

    package com.dxss.stormkafka;
    
    import java.util.Map;
    
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    import scala.collection.mutable.ArrayBuilder.ofBoolean;
    
    public class PrinterBolt extends BaseRichBolt{
    
        private OutputCollector collector;
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
    
        @Override
        public void execute(Tuple input) {
    //        System.out.println("kafak所发布的消息为"+input.getString(0));
            String value = input.getString(0);
            String[] strings = value.split(" ");
            for(String var : strings){
                collector.emit(new Values(var,1));
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "num"));
            
        }
    
    }

    3. Storm-bolt

    package com.dxss.stormkafka;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    public class CountBolt extends BaseRichBolt{
    
        private OutputCollector collector;
        private Map<String,Integer> map = new HashMap();
        
        
        @Override
        public void execute(Tuple arg0) {
            // TODO Auto-generated method stub
            String word = arg0.getString(0);
            Integer num = arg0.getInteger(1);
            if (map.containsKey(word)){
                Integer number = map.get(word);
                map.put(word,number+1);
            }else{
                map.put(word, num);
            }
            String result = word + ":" + map.get(word);
            this.collector.emit(new Values(result));
        }
    
        @Override
        public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
            this.collector = arg2;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer arg0) {
            arg0.declare(new Fields("result"));
            
        }
    
    }

    4. Storm--Topology

    package com.dxss.stormkafka;
    
    import org.apache.storm.hdfs.bolt.HdfsBolt;
    import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
    import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
    import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    import org.apache.storm.hdfs.bolt.format.RecordFormat;
    import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
    import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
    import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.generated.StormTopology;
    import backtype.storm.spout.SchemeAsMultiScheme;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    import storm.kafka.BrokerHosts;
    import storm.kafka.KafkaSpout;
    import storm.kafka.SpoutConfig;
    import storm.kafka.StringScheme;
    import storm.kafka.ZkHosts;
    
    public class StormKafka {
        
        public static StormTopology createTopology(){
            BrokerHosts boBrokerHosts = new ZkHosts("12.13.41.41:2182");
            String topic = "sunfei";
            String zkRoot = "/sunfei";
            String spoutId = "sunfei_storm";
            SpoutConfig spoutConfig = new SpoutConfig(boBrokerHosts, topic, zkRoot, spoutId);
            spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
            // 输出字段分隔符
            RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(":");
            // 每1000个tuple同步到hdfs一次
            SyncPolicy syncPolicy = new CountSyncPolicy(1000);
            // 每个文件的大小为100M
            FileRotationPolicy policy = new FileSizeRotationPolicy(100.0f,FileSizeRotationPolicy.Units.MB);
            // 设置输出目录
            FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/test/storm")
                    .withPrefix("storm_").withExtension(".txt");
            // 执行HDFS地址
            HdfsBolt hdfsBolt = new HdfsBolt().withFsUrl("hdfs://12.13.41.43:8020")
                    .withFileNameFormat(fileNameFormat)
                    .withRecordFormat(format)
                    .withRotationPolicy(policy)
                    .withSyncPolicy(syncPolicy);
            
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("words", new KafkaSpout(spoutConfig));
            builder.setBolt("print", new PrinterBolt()).shuffleGrouping("words");
            builder.setBolt("totalBolt", new CountBolt()).fieldsGrouping("print", new Fields("word"));
            builder.setBolt("hdfsBolt", hdfsBolt).shuffleGrouping("totalBolt");
    
            return builder.createTopology();
            
        }
        public static void main(String[] args) throws InterruptedException {
            
            Config config = new Config();
            config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 2000);
            StormTopology topology = StormKafka.createTopology();
            config.setNumWorkers(1);
            config.setMaxTaskParallelism(2);
            //提交集群模式
    //        try {
    //            StormSubmitter.submitTopology("mywordcount", config, topology);
    //        } catch (AlreadyAliveException e) {
    //            // TODO Auto-generated catch block
    //            e.printStackTrace();
    //        } catch (InvalidTopologyException e) {
    //            // TODO Auto-generated catch block
    //            e.printStackTrace();
    //        }
            //本地运行模式
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("kafkastorm", config, topology);
            Thread.sleep(1000);
        
        }
    
    }

    5. 项目pom.xml文件

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.kafka</groupId>
      <artifactId>StormDemo</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <dependencies>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <version>0.9.3</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-kafka</artifactId>
                <version>0.9.3</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>commons-io</groupId>
                <artifactId>commons-io</artifactId>
                <version>2.5</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.7.1</version>
                <exclusions>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>2.7.1</version>
                <exclusions>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-hdfs</artifactId>
                <version>0.10.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-client</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-hdfs</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>jdk.tools</groupId>
                <artifactId>jdk.tools</artifactId>
                <version>1.8</version>
                <scope>system</scope>
                <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
            </dependency>
            
            <dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java</artifactId>
                <version>2.5.0</version>
            </dependency>
            <!-- storm-kafka模块需要的依赖 -->
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>2.5.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-io</artifactId>
                <version>1.3.2</version>
            </dependency>
            <!-- kafka -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.10</artifactId>
                <version>0.8.1.1</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.zookeeper</groupId>
                        <artifactId>zookeeper</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
        </dependencies>
        <repositories>
            <repository>
                <id>central</id>
                <url>http://repo1.maven.org/maven2/</url>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
                <releases>
                    <enabled>true</enabled>
                </releases>
            </repository>
            <repository>
                <id>clojars</id>
                <url>https://clojars.org/repo/</url>
                <snapshots>
                    <enabled>true</enabled>
                </snapshots>
                <releases>
                    <enabled>true</enabled>
                </releases>
            </repository>
            <repository>
                <id>scala-tools</id>
                <url>http://scala-tools.org/repo-releases</url>
                <snapshots>
                    <enabled>true</enabled>
                </snapshots>
                <releases>
                    <enabled>true</enabled>
                </releases>
            </repository>
            <repository>
                <id>conjars</id>
                <url>http://conjars.org/repo/</url>
                <snapshots>
                    <enabled>true</enabled>
                </snapshots>
                <releases>
                    <enabled>true</enabled>
                </releases>
            </repository>
        </repositories>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>1.6</source>
                        <target>1.6</target>
                        <encoding>UTF-8</encoding>
                        <showDeprecation>true</showDeprecation>
                        <showWarnings>true</showWarnings>
                    </configuration>
                </plugin>
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                        <archive>
                            <manifest>
                                <mainClass></mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
        
    </project>
    View Code
  • 相关阅读:
    How to do Deep Learning on Graphs with Graph Convolutional Networks
    《编程珠玑》读书笔记
    NFFM的原理与代码
    场感知因子分解机器的原理与代码
    数学公式中的变体字母
    因子分解机原理与代码
    LightGBM GPU python版本安装
    Pytorch:使用GPU训练
    [FJOI2020]世纪大逃亡 题解
    [统一省选2020]冰火战士 题解
  • 原文地址:https://www.cnblogs.com/sunfie/p/7264984.html
Copyright © 2011-2022 走看看