zoukankan      html  css  js  c++  java
  • [spark]Spark Streaming教程

    (一)官方入门示例

    废话不说,先来个示例,有个感性认识再介绍。

    这个示例来自spark自带的example,基本步骤如下:

    (1)使用以下命令输入流消息:

    $ nc -lk 9999

    (2)在一个新的终端中运行NetworkWordCount,统计上面的词语数量并输出:

    $ bin/run-example streaming.NetworkWordCount localhost 9999

    (3)在第一步创建的输入流程中敲入一些内容,在第二步创建的终端中会看到统计结果,如:

    第一个终端输入的内容:

    hello world again

    第二个端口的输出

    -------------------------------------------
    Time: 1436758706000 ms
    -------------------------------------------
    (again,1)
    (hello,1)
    (world,1)

    简单解释一下,上面的示例通过手工敲入内容,并传给spark streaming统计单词数量,然后将结果打印出来。

    附上代码:

    package org.apache.spark.examples.streaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.storage.StorageLevel
    
    /**
     * Counts words in UTF8 encoded, '
    ' delimited text received from the network every second.
     *
     * Usage: NetworkWordCount
     *  and  describe the TCP server that Spark Streaming would connect to receive data.
     *
     * To run this on your local machine, you need to first run a Netcat server
     *    `$ nc -lk 9999`
     * and then run the example
     *    `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
     */
    object NetworkWordCount {
      def main(args: Array[String]) {
        if (args.length < 2) {
          System.err.println("Usage: NetworkWordCount  ")
          System.exit(1)
        }
    
        StreamingExamples.setStreamingLogLevels()
    
        // Create the context with a 1 second batch size
        val sparkConf = new SparkConf().setAppName("NetworkWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(1))
    
        // Create a socket stream on target ip:port and count the
        // words in input stream of 
     delimited text (eg. generated by 'nc')
        // Note that no duplication in storage level only for running locally.
        // Replication necessary in distributed scenario for fault tolerance.
        val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }

    (二)Spark Streaming kafka示例

    本示例使用java+maven来构建一个wordcount

    1、创建项目,在pom.xml添加如下的依赖关系

    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.0</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.0</version>
    </dependency>
    <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.4.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.4.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.4.0</version>
    </dependency>
     
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.1</version>
    </dependency>

    2、写代码,此部分代码使用了官方的代码:

    package com.netease.gdc.kafkaStreaming;
    
    import java.util.Map;
    import java.util.HashMap;
    import java.util.regex.Pattern;
    
    
    import scala.Tuple2;
    import com.google.common.collect.Lists;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    
    /**
     * Consumes messages from one or more topics in Kafka and does wordcount.
     *
     * Usage: JavaKafkaWordCount
     * is a list of one or more zookeeper servers that make quorum
     * is the name of kafka consumer group
     * is a list of one or more kafka topics to consume from
     *is the number of threads the kafka consumer should use
     *
     * To run this example:
     *   `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, 
     *    zoo03 my-consumer-group topic1,topic2 1`
     */
    
    public final class JavaKafkaWordCount {
      private static final Pattern SPACE = Pattern.compile(" ");
    
      private JavaKafkaWordCount() {
      }
    
      public static void main(String[] args) {
        if (args.length < 4) {
          System.err.println("Usage: JavaKafkaWordCount
    ");
          System.exit(1);
        }
    
        SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
        // Create the context with a 1 second batch size
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
    
        int numThreads = Integer.parseInt(args[3]);
        Map topicMap = new HashMap();
        String[] topics = args[2].split(",");
        for (String topic: topics) {
          topicMap.put(topic, numThreads);
        }
    
        JavaPairReceiverInputDStream messages =
                KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
    
        JavaDStream lines = messages.map(new Function() {
          @Override
          public String call(Tuple2 tuple2) {
            return tuple2._2();
          }
        });
    
        JavaDStream words = lines.flatMap(new FlatMapFunction() {
          @Override
          public Iterable call(String x) {
            return Lists.newArrayList(SPACE.split(x));
          }
        });
    
        JavaPairDStream wordCounts = words.mapToPair(
          new PairFunction() {
            @Override
            public Tuple2 call(String s) {
              return new Tuple2(s, 1);
            }
          }).reduceByKey(new Function2() {
            @Override
            public Integer call(Integer i1, Integer i2) {
              return i1 + i2;
            }
          });
    
        wordCounts.print();
        jssc.start();
        jssc.awaitTermination();
      }
    }

    3、上传到服务器中然后编译

    mvn clean package

    4、提交job到spark中

    /home/hadoop/spark/bin/spark-submit --jars ../mylib/metrics-core-2.2.0.jar,../mylib/zkclient-0.3.jar,../mylib/spark-streaming-kafka_2.10-1.4.0.jar,../mylib/kafka-clients-0.8.2.1.jar,../mylib/kafka_2.10-0.8.2.1.jar  --class com.netease.gdc.kafkaStreaming.JavaKafkaWordCount --master spark://192.168.16.102:7077  target/kafkaStreaming-0.0.1-SNAPSHOT.jar 192.168.172.111:2181/kafka my-consumer-group test 3

    当然,前提是kafka集群已经正常运行,且存在test这个topic

    5、验证

    打开一个console producer,输入内容,然后观察wordcount的结果。

    结果形式如下:

    (hi,1)

      

    (三)基本步骤

    本部分介绍创建一个spark streaming应用的基本步骤

    1、构建依赖关系,以maven为例,需要在pom.xml中添加以下内容

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.4.0</version>
    </dependency>

     如果需要使用其它数据源,则还需要将相应的依赖关系放入pom.xml。

    如使用kafka作为数据源:

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.4.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.4.0</version>
    </dependency>
     

    当然,spark的核心包也要包含:

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.4.0</version>
    </dependency>
  • 相关阅读:
    虚拟机VMware配置centos7集群(亲测有效)
    linux虚拟机克隆后,虚拟机ping不通的解决方法
    VC++使用 GDI+等比例缩放图片,并且居中显示
    VS2015 编译OSG Plugins Giflib静态库
    Qt 读写文件操作
    OSG 常用快捷键(全屏、查看帧数、截屏)
    Navicat Premium v15 中文最新破解版(附:激活工具)
    redis 持久化机制及配置
    Redis 五种数据类型
    linux 安装redis
  • 原文地址:https://www.cnblogs.com/lujinhong2/p/4660695.html
Copyright © 2011-2022 走看看