zoukankan      html  css  js  c++  java
  • spark streaming向RDD和DataFrame转换

    Data streaming转为DataFrame,不能直接一步转到DF,需要先转为RDD,然后再转到DF,我们用流式处理数据后,再通过spark sql实时获取我们想要的结果。

    1.首先老规矩,创建spark上下文对象,spark SQL和spark Streaming,再创建个socket在Linux端打入数据。

    1 val conf = new SparkConf().setAppName("Demo04DSToRDDToDF").setMaster("local[2]")
    2 
    3     conf.set("spark.sql.shuffle.partitions", "1")
    4     val sc = new SparkContext(conf)
    5     val sqlContext = new SQLContext(sc)
    6 
    7     val ssc = new StreamingContext(sc,Durations.seconds(5))
    8 
    9     val lines = ssc.socketTextStream("master",8888)

    2.首先用foreachRDD方法把spark streaming转为RDD

     1     /**
     2       *
     3       * DS  -> RDD -> DF
     4       *
     5       */
     6     lines.foreachRDD(rdd => {
     7      val stuRDD = rdd.map(line => {
     8         val split = line.split(",")
     9        (split(0),split(1),split(2).toInt,split(3),split(4))
    10       })

    3.导入sqlContext隐式转换,将RDD To成DF,同时传入column对应RDD返回值

     1       //导入隐式转换,将RDD转换为DF
     2       import sqlContext.implicits._
     3 
     4       val stuDF = stuRDD.toDF("id","name","age","gender","clazz")
     5       //注册成表
     6       stuDF.registerTempTable("student")
     7 
     8       val result = sqlContext.sql("select clazz,count(1) from student group by clazz")
     9 
    10       result.show()
    11       result.write.mode(SaveMode.Append).json("Spark/data/dsondf")

    总结,这里的sqlContext必须自己创建好,原来我还以为是导包的时候,类都已经封装好了的,直接import就行了,报的数组下标越界,懵了半天。

  • 相关阅读:
    PHP中功能强大却少使用的函数
    php 判断美国zip code
    php 操作 MySQL 中的Blob类型
    THINKPHP 多域名 MEMCACHE方式共享SESSION数据
    phpQuery—基于jQuery的PHP实现
    Error calling method on NPObject!
    国外共享软件热门上载站点
    你留意过自己的父母吗?
    域名LOGO产生器 (全智能)
    想要创业者须知 企业注册商标6大步
  • 原文地址:https://www.cnblogs.com/zzzzrrrr/p/13089534.html
Copyright © 2011-2022 走看看