zoukankan      html  css  js  c++  java
  • 071 SparkStreaming与SparkSQL集成

    1.说明

      虽然DStream可以转换成RDD,但是如果比较复杂,可以考虑使用SparkSQL。

    2.集成方式

      Streaming和Core整合:
        transform或者foreachRDD方法
      Core和SQL整合:
        RDD <==> DataFrame 互换

    3.程序

     1 package com.sql.it
     2 import org.apache.spark.sql.SQLContext
     3 import org.apache.spark.storage.StorageLevel
     4 import org.apache.spark.streaming.kafka.KafkaUtils
     5 import org.apache.spark.streaming.{Seconds, StreamingContext}
     6 import org.apache.spark.{SparkConf, SparkContext}
     7 object StreamingSQL {
     8   def main(args: Array[String]): Unit = {
     9     val conf = new SparkConf()
    10       .setAppName("StreamingWindowOfKafka22")
    11       .setMaster("local[*]")
    12     val sc = SparkContext.getOrCreate(conf)
    13     val ssc = new StreamingContext(sc, Seconds(5))
    14     // 当调用updateStateByKey函数API的时候,必须给定checkpoint dir
    15     // 路径对应的文件夹不能存在
    16     ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/9421151351")
    17 
    18     val kafkaParams = Map(
    19       "group.id" -> "streaming-kafka-78912151",
    20       "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",
    21       "auto.offset.reset" -> "smallest"
    22     )
    23     val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于1
    24     val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
    25       ssc, // 给定SparkStreaming上下文
    26       kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接
    27       topics, // 给定读取对应topic的名称以及读取数据的线程数量
    28       StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别
    29     ).map(_._2)
    30 
    31     /**
    32       * transform:将DStream的操作转换为RDD的操作,调用该api最终只需要返回一个新的RDD即可
    33       */
    34     dstream.transform(rdd => {
    35       // 使用sql统计wordcoount
    36       val sqlContext = SQLContextSingelton.getSQLContext(rdd.sparkContext)
    37       import sqlContext.implicits._
    38       val procedRDD = rdd.filter(_.nonEmpty).flatMap(_.split(" ").map((_, 1)))
    39       procedRDD.toDF("word", "c").registerTempTable("tb_word")
    40       val resultRDD = sqlContext.sql("select word, count(c) as vc from tb_word group by word").map(row => {
    41         val word = row.getAs[String]("word")
    42         val count = row.getAs[Long]("vc")
    43         (word, count)
    44       })
    45 
    46       resultRDD
    47     }).print()
    48 
    49     // 启动开始处理
    50     ssc.start()
    51     ssc.awaitTermination() // 等等结束,监控一个线程的中断操作
    52   }
    53 }
    54 
    55 object SQLContextSingelton {
    56   @transient private var instance: SQLContext = _
    57 
    58   def getSQLContext(sc: SparkContext): SQLContext = {
    59     if (instance == null) {
    60       synchronized[SQLContext] {
    61         if (instance == null) {
    62           instance = new SQLContext(sc)
    63         }
    64         instance
    65       }
    66     }
    67     instance
    68   }
    69 }

    4.效果

      

  • 相关阅读:
    LG P4213【模板】杜教筛(Sum)
    JZOJ 3447.摘取作物
    JZOJ 3448.公路维护
    JZOJ 4496. 【GDSOI 2016】第一题 互补约数
    jmeter的参数化之函数助手的使用
    window10平台运行jenkins.war的插件安装失败的解决
    jmeter的断言之响应断言的使用
    在虚拟机里安装完mysql后,开启root远程登录权限
    Word Excel PPT 2016从新手到高手
    Oracle 如何停止正在后台执行的impdp/expdp 任务
  • 原文地址:https://www.cnblogs.com/juncaoit/p/9490359.html
Copyright © 2011-2022 走看看