zoukankan      html  css  js  c++  java
  • DStream转为DF的两种方式(突破map时元组22的限制)

    在进行Spark Streaming的开发时,我们常常需要将DStream转为DataFrame来进行进一步的处理,
    共有两种方式,方式一:

    val spark = SparkSession.builder()
      .appName("Test")
      .getOrCreate()
    import spark.implicits._
    dStream.foreachRDD{ rdd =>
      val df = rdd.map(_.split(" "))
        .map(t => (t(1),t(2),t(3)))
        .toDF("col1","col2","col3")
      // 业务逻辑
    }
    

    利用map算子和tuple来完成,一般的场景下采用这种方式即可。

    但是有的时候我们会遇到列数大于22的情况,这个时候会受到scala的tuple数不能超过22的影响。这时可以采用方式二:

    val spark = SparkSession.builder()
      .appName("Test")
      .getOrCreate()
    dStream.foreachRDD{ rdd =>
      val res:RDD[Row] = rdd.map{ row =>
        val buffer = ArrayBuffer.empty[Any]
        val fields: Array[String] = row.split("\|~\|")
        buffer.append(fields(0))
        buffer.append(fields(1))
        buffer.append(fields(2))
        // 省略
        buffer.append(fields(25))
        Row.fromSeq(buffer)
      } 
      val schema = StructType(Seq(
        StructField("col1", StringType, false),
        StructField("col2", StringType, false),
        StructField("col3", StringType, false),
        // 省略
        StructField("col26", StringType, false)
      ))
      val df: DataFrame = spark.createDataFrame(result, schema)
      // 业务逻辑
    }
    
  • 相关阅读:
    Visual Studio2017 无法添加引用的解决方法
    第13周学习进度
    mininet之miniedit可视化操作
    构建之法阅读笔记05
    软件工程课堂练习找水王续
    第12周学习进度
    VS2015做单元测试
    学习调用第三方的WebService服务
    软件工程课堂练习找水王
    第11周学习进度
  • 原文地址:https://www.cnblogs.com/icecola/p/11176600.html
Copyright © 2011-2022 走看看