zoukankan      html  css  js  c++  java
  • Spark Streaming编程示例

    近期也有开始研究使用spark streaming来实现流式处理。本文以流式计算word count为例,简单描述如何进行spark streaming编程。

    1. 依赖的jar包

    参考《分别用Eclipse和IDEA搭建Scala+Spark开发环境》一文,pom.xml中指定依赖库spark-streaming_2.10.jar。

    <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>${scala.version}</version>
        </dependency>
        <!-- Spark -->
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.10</artifactId>
          <version>1.1.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.10</artifactId>
          <version>1.1.0</version>
        </dependency>
        <!-- HDFS -->
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>2.6.0</version>
        </dependency>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.4</version>
          <scope>test</scope>
        </dependency>
        </dependency>

    2. WordCount代码示例

    监听socket端口,每5秒统计一次收到的文本的单词数量,并输出到屏幕。

    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
    
    /**
     * Spark Streaming示例,统计输入中所有单词出现的次数
     *
     */
    object StreamingWordCount {
        def main(args: Array[String]) {
            if (args.length < 2) {
                System.err.println("Usage: NetworkWordCount <hostname> <port>")
                System.exit(1)
            }
    
            // Create the context with a 5 second batch size
            val sparkConf = new SparkConf().setAppName("NetworkWordCount")
            val ssc = new StreamingContext(sparkConf, Seconds(5))
    
            // Create a socket stream on target ip:port and count the
            // words in input stream of 
     delimited text
            val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
            val words = lines.flatMap(_.split(" "))
            val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    
            wordCounts.print()
            ssc.start()
            ssc.awaitTermination()
        }
    }

    3. 提交任务和监听集群

    socketTextStream是从监听service的socket端口。

    (1)Job如何提交:
    $SPARK_HOME/bin/spark-submit --name StreamingDemo --class StreamingWordCount ./sparktest-1.0-SNAPSHOT.jar localhost 1234

    (2)监听socket端口:

    nc -lk 1234

     

  • 相关阅读:
    javascript——表单校验工具代码
    java工具类——字符串类型的时间格式转换为Timestamp类型
    log4j:WARN Please initialize the log4j system properly
    CentOS 网络配置
    linux-vim 快捷键
    2020年7月29日 数组操作练习
    2020年7月28日数组的声明,静态、动态初始化,遍历,内存分析
    2020年7月27日 练习题4、5、6、7、8、9、12
    2020年7月24日 100以内猜数字,统计次数
    2020年7月24日 编写FooBizBaz.java
  • 原文地址:https://www.cnblogs.com/simplestupid/p/4712317.html
Copyright © 2011-2022 走看看