zoukankan      html  css  js  c++  java
  • Spark Streaming编程示例

    近期也有开始研究使用spark streaming来实现流式处理。本文以流式计算word count为例,简单描述如何进行spark streaming编程。

    1. 依赖的jar包

    参考《分别用Eclipse和IDEA搭建Scala+Spark开发环境》一文,pom.xml中指定依赖库spark-streaming_2.10.jar。

    <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>${scala.version}</version>
        </dependency>
        <!-- Spark -->
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.10</artifactId>
          <version>1.1.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.10</artifactId>
          <version>1.1.0</version>
        </dependency>
        <!-- HDFS -->
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>2.6.0</version>
        </dependency>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.4</version>
          <scope>test</scope>
        </dependency>
        </dependency>

    2. WordCount代码示例

    监听socket端口,每5秒统计一次收到的文本的单词数量,并输出到屏幕。

    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
    
    /**
     * Spark Streaming示例,统计输入中所有单词出现的次数
     *
     */
    object StreamingWordCount {
        def main(args: Array[String]) {
            if (args.length < 2) {
                System.err.println("Usage: NetworkWordCount <hostname> <port>")
                System.exit(1)
            }
    
            // Create the context with a 5 second batch size
            val sparkConf = new SparkConf().setAppName("NetworkWordCount")
            val ssc = new StreamingContext(sparkConf, Seconds(5))
    
            // Create a socket stream on target ip:port and count the
            // words in input stream of 
     delimited text
            val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
            val words = lines.flatMap(_.split(" "))
            val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    
            wordCounts.print()
            ssc.start()
            ssc.awaitTermination()
        }
    }

    3. 提交任务和监听集群

    socketTextStream是从监听service的socket端口。

    (1)Job如何提交:
    $SPARK_HOME/bin/spark-submit --name StreamingDemo --class StreamingWordCount ./sparktest-1.0-SNAPSHOT.jar localhost 1234

    (2)监听socket端口:

    nc -lk 1234

     

  • 相关阅读:
    Spring Cloud Bus 消息总线介绍
    工商银行分布式服务 C10K 场景解决方案
    关于写好文章的3个心法和5点技巧
    混合云K8s容器化应用弹性伸缩实战
    云原生下的灰度体系建设
    被解救的代码
    【2020-10-22】我是否一个真正靠谱的人
    【2020-10-21】以谦虚与忍耐去期待豁然与贯通
    【2020-10-20】压力也是一种感受,用心体会
    【2020-10-19】不断试错的过程
  • 原文地址:https://www.cnblogs.com/simplestupid/p/4712317.html
Copyright © 2011-2022 走看看