zoukankan      html  css  js  c++  java
  • 【spark】sparkSQL自定义批处理输入输出

    背景

    sparkSQL 自带了很多输入和输出方式

    但很多时候需要添加其他输入输出方式例如 hbase、socket、websocket、excel等

    输出

    以websocket输出为例

    最终消息发送的代码:

    df.write.format("cn.zwy.websocket")
          .option("uri", "ws://localhost:8888")
          .save()

    spark 会在format填写的包 cn.zwy.websocket 下寻找DefaultSource类。因此在该包下创建一个DefaultSource类

    package cn.zwy.websocket 
    
    import cn.zwy.common.JsonUtil
    import cn.zwy.WebSocketClient
    import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, Filter}
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
    
    import java.net.URI
    import scala.collection.JavaConverters._
    
    class DefaultSource extends CreatableRelationProvider with DataSourceRegister{
    
      override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
    
        val uri = parameters("uri")
        data.foreachPartition(partition=>{
          val client = new WebSocketClient(new URI(uri))
          client.connectBlocking()
          partition.foreach(row=>{
            try {
              val schema = row.schema
              val dataMap = row.getValuesMap(schema.fieldNames).asJava
              client.send(JsonUtil.Encode(dataMap))
            } catch {
              case _:Exception =>
            }
          })
          client.close()
        })
    
        new BaseRelation {
          override def sqlContext: SQLContext = unsupportedException
          override def schema: StructType = unsupportedException
          override def needConversion: Boolean = unsupportedException
          override def sizeInBytes: Long = unsupportedException
          override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException
          private def unsupportedException =
            throw new UnsupportedOperationException("BaseRelation from web socket write " +
              "operation is not usable.")
        }
      }
    
      override def shortName(): String = "websocket"
    }

    写出文件需要实现CreatableRelationProvider接口

    而对应的读取数据的接口则是 RelationProvider接口

    至于最后的返回

    new BaseRelation {
          override def sqlContext: SQLContext = unsupportedException
          override def schema: StructType = unsupportedException
          override def needConversion: Boolean = unsupportedException
          override def sizeInBytes: Long = unsupportedException
          override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException
          private def unsupportedException =
            throw new UnsupportedOperationException("BaseRelation from web socket write " +
              "operation is not usable.")
        }

    没细品,抄的kafka的代码。github上某些项目最后返回的RelationProvider的createRelation方法,貌似是写完了之后再读取一遍。

    钦此!

  • 相关阅读:
    List<Map>遍历相加
    jqgrid属性
    idea Could not autowire. No beans of 'xxxx' type found
    【笔记】抓取百度贴吧
    python url中文转码
    python lxml 库
    Python 基础 (笔记)
    HTML 背景
    HTML Iframe
    HTML 响应式 Web 设计
  • 原文地址:https://www.cnblogs.com/zhouwenyang/p/14335605.html
Copyright © 2011-2022 走看看