zoukankan      html  css  js  c++  java
  • spark streaming向RDD和DataFrame转换

    Data streaming转为DataFrame,不能直接一步转到DF,需要先转为RDD,然后再转到DF,我们用流式处理数据后,再通过spark sql实时获取我们想要的结果。

    1.首先老规矩,创建spark上下文对象,spark SQL和spark Streaming,再创建个socket在Linux端打入数据。

    1 val conf = new SparkConf().setAppName("Demo04DSToRDDToDF").setMaster("local[2]")
    2 
    3     conf.set("spark.sql.shuffle.partitions", "1")
    4     val sc = new SparkContext(conf)
    5     val sqlContext = new SQLContext(sc)
    6 
    7     val ssc = new StreamingContext(sc,Durations.seconds(5))
    8 
    9     val lines = ssc.socketTextStream("master",8888)

    2.首先用foreachRDD方法把spark streaming转为RDD

     1     /**
     2       *
     3       * DS  -> RDD -> DF
     4       *
     5       */
     6     lines.foreachRDD(rdd => {
     7      val stuRDD = rdd.map(line => {
     8         val split = line.split(",")
     9        (split(0),split(1),split(2).toInt,split(3),split(4))
    10       })

    3.导入sqlContext隐式转换,将RDD To成DF,同时传入column对应RDD返回值

     1       //导入隐式转换,将RDD转换为DF
     2       import sqlContext.implicits._
     3 
     4       val stuDF = stuRDD.toDF("id","name","age","gender","clazz")
     5       //注册成表
     6       stuDF.registerTempTable("student")
     7 
     8       val result = sqlContext.sql("select clazz,count(1) from student group by clazz")
     9 
    10       result.show()
    11       result.write.mode(SaveMode.Append).json("Spark/data/dsondf")

    总结,这里的sqlContext必须自己创建好,原来我还以为是导包的时候,类都已经封装好了的,直接import就行了,报的数组下标越界,懵了半天。

  • 相关阅读:
    ArrayList和CopyOnWriteArrayList
    UML类关系
    Vmware下mint os的安装
    Java VisualVM无法检测到本地java程序 的 解决办法
    谜之闭包
    虚拟DOM(Virtual DOM)
    ES6箭头函数与普通函数的区别
    HTML中 select 与datalist的异同
    备战前端面试
    javascript获取数组中的最大值/最小值
  • 原文地址:https://www.cnblogs.com/zzzzrrrr/p/13089534.html
Copyright © 2011-2022 走看看