zoukankan      html  css  js  c++  java
  • 大数据处理框架之Strom:kafka storm 整合

    storm 使用kafka做数据源,还可以使用文件、redis、jdbc、hive、HDFS、hbase、netty做数据源。

    新建一个maven 工程:

    pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>storm06</groupId>
      <artifactId>storm06</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>
    
      <name>storm07</name>
      <url>http://maven.apache.org</url>
      <repositories>
            <!-- Repository where we can found the storm dependencies  -->
            <repository>
                <id>clojars.org</id>
                <url>http://clojars.org/repo</url>
            </repository>
      </repositories>
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>
      <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.9.2-incubating</version>
        </dependency>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.11</version>
          <scope>test</scope>
        </dependency>
         <dependency>  
            <groupId>org.apache.kafka</groupId>  
            <artifactId>kafka_2.10</artifactId>  
            <version>0.9.0.1</version>  
            <exclusions>
                <exclusion>
                    <groupId>com.sun.jdmk</groupId>
                    <artifactId>jmxtools</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.sun.jmx</groupId>
                    <artifactId>jmxri</artifactId>
                </exclusion>
            </exclusions>
        </dependency>  
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.0-beta9</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-1.2-api</artifactId>
            <version>2.0-beta9</version>
        </dependency>    
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>log4j-over-slf4j</artifactId>
            <version>1.7.10</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.10</version>
        </dependency>   
        <!-- storm & kafka sqout -->
        <dependency>
            <groupId>net.wurstmeister.storm</groupId>
            <artifactId>storm-kafka-0.8-plus</artifactId>
            <version>0.4.0</version>
        </dependency>
        <dependency>
            <groupId>commons-collections</groupId>
            <artifactId>commons-collections</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>15.0</version>
        </dependency>                    
      </dependencies>
        <build>
        <finalName>storm06</finalName>
       <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-war-plugin</artifactId>
                <version>2.4</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.1</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
            <!-- 单元测试 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <configuration>
                    <skip>true</skip>
                    <includes>
                        <include>**/*Test*.java</include>
                    </includes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-source-plugin</artifactId>
                <version>2.1.2</version>
                <executions>
                    <!-- 绑定到特定的生命周期之后,运行maven-source-pluin 运行目标为jar-no-fork -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>jar-no-fork</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>    
        </plugins>    
      </build>
    </project>

    KafkaTopology

    package bhz.storm.kafka.example;
    
    import storm.kafka.KafkaSpout;
    import storm.kafka.SpoutConfig;
    import storm.kafka.StringScheme;
    import storm.kafka.ZkHosts;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.spout.SchemeAsMultiScheme;
    import backtype.storm.topology.TopologyBuilder;
    
    public class KafkaTopology {
        public static void main(String[] args) throws
            AlreadyAliveException, InvalidTopologyException {
            // zookeeper hosts for the Kafka cluster
            ZkHosts zkHosts = new ZkHosts("134.32.123.101:2181,134.32.123.102:2181,134.32.123.103:2181");
    
            // Create the KafkaSpout configuartion
            // Second argument is the topic name
            // Third argument is the zookeeper root for Kafka
            // Fourth argument is consumer group id
            SpoutConfig kafkaConfig = new SpoutConfig(zkHosts,"words_topic", "", "id7");
    
            // Specify that the kafka messages are String
            kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    
            // We want to consume all the first messages in the topic everytime
            // we run the topology to help in debugging. In production, this
            // property should be false
            kafkaConfig.forceFromStart = true;
    
            // Now we create the topology
            TopologyBuilder builder = new TopologyBuilder();
    
            // set the kafka spout class
            builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
    
            // configure the bolts
            builder.setBolt("SentenceBolt", new SentenceBolt(), 1).globalGrouping("KafkaSpout");
            builder.setBolt("PrinterBolt", new PrinterBolt(), 1).globalGrouping("SentenceBolt");
    
    
            // create an instance of LocalCluster class for executing topology in local mode.
            LocalCluster cluster = new LocalCluster();
            Config conf = new Config();
    
            // Submit topology for execution
            cluster.submitTopology("KafkaToplogy", conf, builder.createTopology());
    
    
            try {
                // Wait for some time before exiting
                System.out.println("Waiting to consume from kafka");
                Thread.sleep(10000);
            } catch (Exception exception) {
                System.out.println("Thread interrupted exception : " + exception);
            }
    
            // kill the KafkaTopology
            cluster.killTopology("KafkaToplogy");
    
            // shut down the storm test cluster
            cluster.shutdown();
        }
    }
    package bhz.storm.kafka.example;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.commons.lang.StringUtils;
    
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    
    import com.google.common.collect.ImmutableList;
    
    public class SentenceBolt extends BaseBasicBolt {
    
        // list used for aggregating the words
        private List<String> words = new ArrayList<String>();
    
        public void execute(Tuple input, BasicOutputCollector collector) {
            // Get the word from the tuple
            String word = input.getString(0);
    
            if(StringUtils.isBlank(word)){
                // ignore blank lines
                return;
            }
    
            System.out.println("Received Word:" + word);
    
            // add word to current list of words
            words.add(word);
    
            if (word.endsWith(".")) {
                // word ends with '.' which means this is the end of
                // the sentence publishes a sentence tuple
                collector.emit(ImmutableList.of(
                        (Object) StringUtils.join(words, ' ')));
    
                // and reset the words list.
                words.clear();
            }
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // here we declare we will be emitting tuples with
            // a single field called "sentence"
            declarer.declare(new Fields("sentence"));
        }
    }
    package bhz.storm.kafka.example;
    
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Tuple;
    
    public class PrinterBolt extends BaseBasicBolt {
    
        public void execute(Tuple input, BasicOutputCollector collector) {
            // get the sentence from the tuple and print it
            String sentence = input.getString(0);
            System.out.println("Received Sentence:" + sentence);
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // we don't emit anything
        }
    }
  • 相关阅读:
    Java进程线程理解
    Java String练习题及答案
    代理服务器原理(转)
    FTP服务器原理(转)
    SMTP协议及POP3协议-邮件发送和接收原理(转)
    集合框架的类和接口均在java.util包中。 任何对象加入集合类后,自动转变为Object类型,所以在取出的时候,需要进行强制类型转换。
    Java集合框架顶层接口collectiion接口
    java多态--算法实现就是多态
    项目安排
    Scala从零開始:使用Intellij IDEA写hello world
  • 原文地址:https://www.cnblogs.com/cac2020/p/9870391.html
Copyright © 2011-2022 走看看