zoukankan      html  css  js  c++  java
  • Spark Streaming教程

    废话不说,先来个示例,有个感性认识再介绍。

    这个示例来自spark自带的example,基本步骤如下:

    (1)使用以下命令输入流消息:

    $ nc -lk 9999

    (2)在一个新的终端中运行NetworkWordCount,统计上面的词语数量并输出:

    $ bin/run-example streaming.NetworkWordCount localhost 9999

    (3)在第一步创建的输入流程中敲入一些内容,在第二步创建的终端中会看到统计结果,如:

    第一个终端输入的内容:

    hello world again

    第二个端口的输出

    -------------------------------------------
    Time: 1436758706000 ms
    -------------------------------------------
    (again,1)
    (hello,1)
    (world,1)

    简单解释一下,上面的示例通过手工敲入内容,并传给spark streaming统计单词数量,然后将结果打印出来。

    附上代码:

    package org.apache.spark.examples.streaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.storage.StorageLevel
    
    /**
     * Counts words in UTF8 encoded, '
    ' delimited text received from the network every second.
     *
     * Usage: NetworkWordCount <hostname> <port>
     * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
     *
     * To run this on your local machine, you need to first run a Netcat server
     *    `$ nc -lk 9999`
     * and then run the example
     *    `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
     */
    object NetworkWordCount {
      def main(args: Array[String]) {
        if (args.length < 2) {
          System.err.println("Usage: NetworkWordCount <hostname> <port>")
          System.exit(1)
        }
    
        StreamingExamples.setStreamingLogLevels()
    
        // Create the context with a 1 second batch size
        val sparkConf = new SparkConf().setAppName("NetworkWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(1))
    
        // Create a socket stream on target ip:port and count the
        // words in input stream of 
     delimited text (eg. generated by 'nc')
        // Note that no duplication in storage level only for running locally.
        // Replication necessary in distributed scenario for fault tolerance.
        val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
     
     
    (一)构建自己的项目
    本示例使用java+maven来构建一个wordcount
    1、创建项目,在pom.xml添加如下的依赖关系

    <dependency>

    <groupId>org.slf4j</groupId>

    <artifactId>slf4j-api</artifactId>

    <version>1.7.0</version>

    </dependency>

    <dependency>

    <groupId>org.slf4j</groupId>

    <artifactId>slf4j-log4j12</artifactId>

    <version>1.7.0</version>

    </dependency>

    <dependency>

    <groupId>log4j</groupId>

    <artifactId>log4j</artifactId>

    <version>1.2.17</version>

    </dependency>

    <dependency>

    <groupId>org.apache.spark</groupId>

    <artifactId>spark-core_2.10</artifactId>

    <version>1.4.0</version>

    </dependency>

    <dependency>

    <groupId>org.apache.spark</groupId>

    <artifactId>spark-streaming_2.10</artifactId>

    <version>1.4.0</version>

    </dependency>

    <dependency>

    <groupId>org.apache.spark</groupId>

    <artifactId>spark-streaming-kafka_2.10</artifactId>

    <version>1.4.0</version>

    </dependency>

     

    <dependency>

    <groupId>org.apache.kafka</groupId>

    <artifactId>kafka_2.10</artifactId>

    <version>0.8.2.1</version>

    </dependency>

     
    2、写代码,此部分代码使用了官方的代码:
    package com.netease.gdc.kafkaStreaming;
    
    import java.util.Map;
    import java.util.HashMap;
    import java.util.regex.Pattern;
    
    
    import scala.Tuple2;
    import com.google.common.collect.Lists;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    
    /**
     * Consumes messages from one or more topics in Kafka and does wordcount.
     *
     * Usage: JavaKafkaWordCount
     * is a list of one or more zookeeper servers that make quorum
     * is the name of kafka consumer group
     * is a list of one or more kafka topics to consume from
     *is the number of threads the kafka consumer should use
     *
     * To run this example:
     *   `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, 
     *    zoo03 my-consumer-group topic1,topic2 1`
     */
    
    public final class JavaKafkaWordCount {
      private static final Pattern SPACE = Pattern.compile(" ");
    
      private JavaKafkaWordCount() {
      }
    
      public static void main(String[] args) {
        if (args.length < 4) {
          System.err.println("Usage: JavaKafkaWordCount
    ");
          System.exit(1);
        }
    
        SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
        // Create the context with a 1 second batch size
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
    
        int numThreads = Integer.parseInt(args[3]);
        Map topicMap = new HashMap();
        String[] topics = args[2].split(",");
        for (String topic: topics) {
          topicMap.put(topic, numThreads);
        }
    
        JavaPairReceiverInputDStream messages =
                KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
    
        JavaDStream lines = messages.map(new Function<tuple2, String>() {
          @Override
          public String call(Tuple2 tuple2) {
            return tuple2._2();
          }
        });
    
        JavaDStream words = lines.flatMap(new FlatMapFunction() {
          @Override
          public Iterable call(String x) {
            return Lists.newArrayList(SPACE.split(x));
          }
        });
    
        JavaPairDStream wordCounts = words.mapToPair(
          new PairFunction() {
            @Override
            public Tuple2 call(String s) {
              return new Tuple2(s, 1);
            }
          }).reduceByKey(new Function2() {
            @Override
            public Integer call(Integer i1, Integer i2) {
              return i1 + i2;
            }
          });
    
        wordCounts.print();
        jssc.start();
        jssc.awaitTermination();
      }
    }
     
    3、上传到服务器中然后编译
    mvn clean package
    4、提交job到spark中
    /home/hadoop/spark/bin/spark-submit --jars ../mylib/metrics-core-2.2.0.jar,../mylib/zkclient-0.3.jar,../mylib/spark-streaming-kafka_2.10-1.4.0.jar,../mylib/kafka-clients-0.8.2.1.jar,../mylib/kafka_2.10-0.8.2.1.jar  --class com.netease.gdc.kafkaStreaming.JavaKafkaWordCount --master spark://192.168.165.102:7077  target/kafkaStreaming-0.0.1-SNAPSHOT.jar 192.168.172.111:2181/kafka my-consumer-group test 3
    当然,前提是kafka集群已经正常运行,且存在test这个topic
     
    5、验证
    打开一个console producer,输入内容,然后观察wordcount的结果。
    结果形式如下:
    (hi,1)

      

  • 相关阅读:
    字的传送
    mov 寄存器,段寄存器
    c语言中利用三维数组计算成绩总分数
    python中break语句
    c语言中求课程总分、平均分。学生总分及平均分
    python中assert语句
    python中random模块引入随机数
    python中实现列表的倒序排列
    c语言中求两个矩阵的乘积
    python的严格缩进可以避免else悬挂
  • 原文地址:https://www.cnblogs.com/lujinhong2/p/4642498.html
Copyright © 2011-2022 走看看