zoukankan      html  css  js  c++  java
  • Spark Streaming实战演练

    一、spark streaming简介

    Streaming是一种数据传输技术,它把客户机收到的数据变成一个稳定连续的流,源源不断的输出,使用户听到的声音和图像十分稳定,而用户在整个文件传输完成开始前就可以浏览文件。

    常见的流式计算框架:

    l Apache storm

    l Spark streaming

    l Apache samza

    上述三种实时计算系统都是开源分布式系统,具有低延迟,可扩展和容错性诸多优点,他们的共同特色在于:允许你在运行数据流代码时,将任务分配到一系列具有容错能力的计算机上并行运行。此外,他们都提供了简单的api来简化底层复杂的程度。

    实时计算框架的对比参考文档:http://www.csdn.net/article/2015-03-09/2824135

    Spark Streaming是对spark core api的扩展,他是一个分布式的,高吞吐量,具有容错性的实时数据处理系统。

    clip_image002

    Spark streaming处理数据时一批一批处理的,因此spark streaming仅是一个准实时处理系统,其底层本质上还是基于spark core的批处理应用。

    clip_image004

    二、一个简单的spark streaming示例

    参考:http://spark.apache.org/docs/1.3.0/streaming-programming-guide.html

    1、在shell中运行下面命令:

    $ nc -lk 9999

    2、打开另一个shell,运行下面命令:

    $ ./bin/run-example streaming.NetworkWordCount localhost 9999

    3、在第一个客户端下输入一些以空格分割的单词,在第二个shell端可以实时看到对这些输入进行的单词统计:

    clip_image006

    4、从以上例子中我们可以整理出spark streaming的编程模型

    //导入依赖包
    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
     
    //初始化StreamingContext对象
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(1))

    //以下定义了从哪里读取数据

    val lines = ssc.socketTextStream("localhost", 9999)

    //以下是真正的功能实现

    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    wordCounts.print()
     
    //启动spark streaming
    ssc.start()
    ssc.awaitTermination()

    5、初始化StreamingContext的两种方式:

    1) 从sparkConf创建,通常用于在idea中编程使用。

    2) 从已有的spark contact对象创建,一般应用于spark-shell测试使用。

    clip_image008

    6、spark streaming读取hdfs数据

    6.1)代码:

    //导入依赖包

    import org.apache.spark._

    import org.apache.spark.streaming._

    import org.apache.spark.streaming.StreamingContext._

    //初始化StreamingContext对象

    val ssc = new StreamingContext(sc, Seconds(1))

    //以下定义了从哪里读取数据

    val lines = ssc.textFileStream("hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/")

    //以下是真正的功能实现

    val words = lines.flatMap(_.split(" "))

    val pairs = words.map(word => (word, 1))

    val wordCounts = pairs.reduceByKey(_ + _)

    wordCounts.print()

    //启动spark streaming

    ssc.start()

    ssc.awaitTermination()

    6.2)在spark-shell上运行上述代码:

    创建spark streaming读取hdfs目录:

    $ bin/hdfs dfs -mkdir hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/

    准备数据:

    $ cat /opt/datas/wc.input

    hadoop

    hdfs yarn mapreduce zookeeper

    hive

    sqoop flume oozie hue

    hbase

    storm scala kafka spark

    启动spark-shell,手动运行以上代码:

    $ bin/spark-shell --master local[2]

    scala> import org.apache.spark._

    import org.apache.spark._

    scala> import org.apache.spark.streaming._

    import org.apache.spark.streaming._

    scala> import org.apache.spark.streaming.StreamingContext._

    import org.apache.spark.streaming.StreamingContext._

    scala> val ssc = new StreamingContext(sc, Seconds(1))

    ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@714e203a

    scala> val lines = ssc.textFileStream("hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/")

    17/07/12 16:56:40 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@3d18ac9

    lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@74462773

    scala> val words = lines.flatMap(_.split(" "))

    words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@55322d12

    scala> val pairs = words.map(word => (word, 1))

    pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@4d0fc96d

    scala> val wordCounts = pairs.reduceByKey(_ + _)

    wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@34e46a44

    scala> wordCounts.print()

    //运行以下代码,即启动spark shell

    scala> ssc.start()

    scala> ssc.awaitTermination()

    另起一个shell终端,将测试数据上传到hdfs下hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/目录下:

    $ bin/hdfs dfs -put /opt/datas/wc.input hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/1

    这时我们可能从spark-shell终端获取spark streaming的输出,如下:

    -------------------------------------------

    Time: 1499850053000 ms

    -------------------------------------------

    (scala,1)

    (hive,1)

    (oozie,1)

    (mapreduce,1)

    (zookeeper,1)

    (hue,1)

    (yarn,1)

    (kafka,1)

    (sqoop,1)

    (spark,1)

    ...

    6.3)简化的测试方法

    我们可以发现,以上方法进行spark开发,需要一行一行加载代码,这种方式比较麻烦,那么有没有好的方法一次性加载所有代码呢?当然是存在的,下面我们测试一下通过spark-shell中加载scala文件的方式进行开发测试:

    首先创建一个文件用于存储上述代码:

    $ cat /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/HDFSSparkStreaming.scala

    //导入依赖包

    import org.apache.spark._

    import org.apache.spark.streaming._

    import org.apache.spark.streaming.StreamingContext._

    //初始化StreamingContext对象

    val ssc = new StreamingContext(sc, Seconds(1))

    //以下定义了从哪里读取数据

    val lines = ssc.textFileStream("hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/")

    //以下是真正的功能实现

    val words = lines.flatMap(_.split(" "))

    val pairs = words.map(word => (word, 1))

    val wordCounts = pairs.reduceByKey(_ + _)

    wordCounts.print()

    //启动spark streaming

    ssc.start()

    ssc.awaitTermination()

    删除hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/目录下的所有文件:

    $ bin/hdfs dfs -rm hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/*

    启动一个spark-shell:

    $ bin/spark-shell --master local[2]

    Spark-shell以文本方式运行scala代码:

    scala> :load /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/HDFSSparkStreaming.scala

    另起客户端想目标目录传递文件:

    $ bin/hdfs dfs -put /opt/datas/wc.input hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/1

  • 相关阅读:
    微软推出Silverlight 5.2版本
    C# 安装部署,关于自定义操作,不能被执行。
    SQL_server_2000安装时挂起的解决办法,其实很简单。
    最全Microsoft JET Database Engine(0x80004005)未指定错误的解决方法
    WP博客wordpress,robots.txt写法
    2012年Web设计和开发的15个趋势
    SEO优化应该如何使用nofollow属性
    WordPress如何做SEO?WordPress博客怎样做网站优化。
    windowsXP SP3安装Photoshop CS 3字体输入没反应的问题及解决办法
    打开网页 错误 2147467259 : 错误描述: 未指定的错误 IIS 限XP系统
  • 原文地址:https://www.cnblogs.com/wcwen1990/p/7862417.html
Copyright © 2011-2022 走看看