zoukankan      html  css  js  c++  java
  • Spark Structured Streaming框架(1)之基本用法

       Spark Struntured Streaming是Spark 2.1.0版本后新增加的流计算引擎,本博将通过几篇博文详细介绍这个框架。这篇是介绍Spark Structured Streaming的基本开发方法。以Spark 自带的example进行测试和介绍,其为"StructuredNetworkWordcount.scala"文件。

    1. Quick Example

      由于我们是在单机上进行测试,所以需要修单机运行模型,修改后的程序如下:

    package org.apache.spark.examples.sql.streaming

    import org.apache.spark.sql.SparkSession

    /**

    * Counts words in UTF8 encoded, ' ' delimited text received from the network.

    *

    * Usage: StructuredNetworkWordCount <hostname> <port>

    * <hostname> and <port> describe the TCP server that Structured Streaming

    * would connect to receive data.

    *

    * To run this on your local machine, you need to first run a Netcat server

    * `$ nc -lk 9999`

    * and then run the example

    * `$ bin/run-example sql.streaming.StructuredNetworkWordCount

    * localhost 9999`

    */

    object StructuredNetworkWordCount {

    def main(args: Array[String]) {

    if (args.length < 2) {

    System.err.println("Usage: StructuredNetworkWordCount <hostname> <port>")

    System.exit(1)

    }

    val host = args(0)

    val port = args(1).toInt

    val spark = SparkSession

    .builder

    .appName("StructuredNetworkWordCount")

    .master("local[*]")

    .getOrCreate()

    import spark.implicits._

    // Create DataFrame representing the stream of input lines from connection to host:port

    val lines = spark.readStream

    .format("socket")

    .option("host", host)

    .option("port", port)

    .load()

    // Split the lines into words

    val words = lines.as[String].flatMap(_.split(" "))

    // Generate running word count

    val wordCounts = words.groupBy("value").count()

    // Start running the query that prints the running counts to the console

    val query = wordCounts.writeStream

    .outputMode("complete")

    .format("console")

    .start()

    query.awaitTermination()

    }

    }

    2. 剖析

      对于上述所示的程序,进行如下的解读和分析:

    2.1 数据输入

      在创建SparkSessiion对象之后,需要设置数据源的类型,及数据源的配置。然后就会数据源中源源不断的接收数据,接收到的数据以DataFrame对象存在,该类型与Spark SQL中定义类型一样,内部由Dataset数组组成。

    如下程序所示,设置输入源的类型为socket,并配置socket源的IP地址和端口号。接收到的数据流存储到lines对象中,其类型为DataFarme。

    // Create DataFrame representing the stream of input lines from connection to host:port

    val lines = spark.readStream

    .format("socket")

    .option("host", host)

    .option("port", port)

    .load()

    2.2 单词统计

      如下程序所示,首先将接受到的数据流lines转换为String类型的序列;接着每一批数据都以空格分隔为独立的单词;最后再对每个单词进行分组并统计次数。

    // Split the lines into words

    val words = lines.as[String].flatMap(_.split(" "))

    // Generate running word count

    val wordCounts = words.groupBy("value").count()

    2.3 数据输出

    通过DataFrame对象的writeStream方法获取DataStreamWrite对象,DataStreamWrite类定义了一些数据输出的方式。Quick example程序将数据输出到控制终端。注意只有在调用start()方法后,才开始执行Streaming进程,start()方法会返回一个StreamingQuery对象,用户可以使用该对象来管理Streaming进程。如上述程序调用awaitTermination()方法阻塞接收所有数据。

    3. 异常

    3.1 拒绝连接

      当直接提交编译后的架包时,即没有启动"nc –lk 9999"时,会出现图 11所示的错误。解决该异常,只需在提交(spark-submit)程序之前,先启动"nc"命令即可解决,且不能使用"nc –lk localhost 9999".

    图 11

    3.2 NoSuchMethodError

      当通过mvn打包程序后,在命令行通过spark-submit提交架包,能够正常执行,而通过IDEA执行时会出现图 12所示的错误。

    图 12

      出现这个异常,是由于项目中依赖的Scala版本与Spark编译的版本不一致,从而导致出现这种错误。图 13和图 14所示,Spark 2.10是由Scala 2.10版本编译而成的,而项目依赖的Scala版本是2.11.8,从而导致出现了错误。

    图 13

     

    图 14

      解决该问题,只需在项目的pom.xml文件中指定与spark编译的版本一致,即可解决该问题。如图 15所示的执行结果。

    图 15

    4. 参考文献

  • 相关阅读:
    一般处理程序页ashx 序列化 Json数组
    SQL server 分页
    MySQL 分页
    获取网站的BaseURL
    java学习书籍推荐
    查询并关闭指定端口进程
    ettercap使用
    MS10-046漏洞利用
    MS12-020漏洞利用
    入侵安卓手机
  • 原文地址:https://www.cnblogs.com/huliangwen/p/7470599.html
Copyright © 2011-2022 走看看