zoukankan      html  css  js  c++  java
  • Spark Streaming编程示例

    近期也有开始研究使用spark streaming来实现流式处理。本文以流式计算word count为例,简单描述如何进行spark streaming编程。

    1. 依赖的jar包

    参考《分别用Eclipse和IDEA搭建Scala+Spark开发环境》一文,pom.xml中指定依赖库spark-streaming_2.10.jar。

    <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>${scala.version}</version>
        </dependency>
        <!-- Spark -->
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.10</artifactId>
          <version>1.1.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.10</artifactId>
          <version>1.1.0</version>
        </dependency>
        <!-- HDFS -->
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>2.6.0</version>
        </dependency>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.4</version>
          <scope>test</scope>
        </dependency>
        </dependency>

    2. WordCount代码示例

    监听socket端口,每5秒统计一次收到的文本的单词数量,并输出到屏幕。

    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
    
    /**
     * Spark Streaming示例,统计输入中所有单词出现的次数
     *
     */
    object StreamingWordCount {
        def main(args: Array[String]) {
            if (args.length < 2) {
                System.err.println("Usage: NetworkWordCount <hostname> <port>")
                System.exit(1)
            }
    
            // Create the context with a 5 second batch size
            val sparkConf = new SparkConf().setAppName("NetworkWordCount")
            val ssc = new StreamingContext(sparkConf, Seconds(5))
    
            // Create a socket stream on target ip:port and count the
            // words in input stream of 
     delimited text
            val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
            val words = lines.flatMap(_.split(" "))
            val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    
            wordCounts.print()
            ssc.start()
            ssc.awaitTermination()
        }
    }

    3. 提交任务和监听集群

    socketTextStream是从监听service的socket端口。

    (1)Job如何提交:
    $SPARK_HOME/bin/spark-submit --name StreamingDemo --class StreamingWordCount ./sparktest-1.0-SNAPSHOT.jar localhost 1234

    (2)监听socket端口:

    nc -lk 1234

     

  • 相关阅读:
    转: MySQL 赋予用户权限(grant %-远程和localhost-本地区别)
    修改Apache的最大连接数
    正向代理与反向代理的区别【Nginx读书笔记】
    mysql加单引号和不加单引号的性能比较
    mysql保存数据提示1366 Incorrect string value: ‘xF0x9Fx98x8AxF0x9F…’ 解决
    Mysql外键约束设置使用方法
    MYSQL分库分表和不停机更改表结构
    Hive SQL 常用日期
    CountDownLatch学习
    Exchanger学习
  • 原文地址:https://www.cnblogs.com/simplestupid/p/4712317.html
Copyright © 2011-2022 走看看