zoukankan      html  css  js  c++  java
  • 【spark】sparkSQL自定义批处理输入输出

    背景

    sparkSQL 自带了很多输入和输出方式

    但很多时候需要添加其他输入输出方式例如 hbase、socket、websocket、excel等

    输出

    以websocket输出为例

    最终消息发送的代码:

    df.write.format("cn.zwy.websocket")
          .option("uri", "ws://localhost:8888")
          .save()

    spark 会在format填写的包 cn.zwy.websocket 下寻找DefaultSource类。因此在该包下创建一个DefaultSource类

    package cn.zwy.websocket 
    
    import cn.zwy.common.JsonUtil
    import cn.zwy.WebSocketClient
    import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, Filter}
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
    
    import java.net.URI
    import scala.collection.JavaConverters._
    
    class DefaultSource extends CreatableRelationProvider with DataSourceRegister{
    
      override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
    
        val uri = parameters("uri")
        data.foreachPartition(partition=>{
          val client = new WebSocketClient(new URI(uri))
          client.connectBlocking()
          partition.foreach(row=>{
            try {
              val schema = row.schema
              val dataMap = row.getValuesMap(schema.fieldNames).asJava
              client.send(JsonUtil.Encode(dataMap))
            } catch {
              case _:Exception =>
            }
          })
          client.close()
        })
    
        new BaseRelation {
          override def sqlContext: SQLContext = unsupportedException
          override def schema: StructType = unsupportedException
          override def needConversion: Boolean = unsupportedException
          override def sizeInBytes: Long = unsupportedException
          override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException
          private def unsupportedException =
            throw new UnsupportedOperationException("BaseRelation from web socket write " +
              "operation is not usable.")
        }
      }
    
      override def shortName(): String = "websocket"
    }

    写出文件需要实现CreatableRelationProvider接口

    而对应的读取数据的接口则是 RelationProvider接口

    至于最后的返回

    new BaseRelation {
          override def sqlContext: SQLContext = unsupportedException
          override def schema: StructType = unsupportedException
          override def needConversion: Boolean = unsupportedException
          override def sizeInBytes: Long = unsupportedException
          override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException
          private def unsupportedException =
            throw new UnsupportedOperationException("BaseRelation from web socket write " +
              "operation is not usable.")
        }

    没细品,抄的kafka的代码。github上某些项目最后返回的RelationProvider的createRelation方法,貌似是写完了之后再读取一遍。

    钦此!

  • 相关阅读:
    c#中String跟string的“区别”<转>
    JS中判断对象是否为空
    report builder地址:http://localhost/reports
    今天开始,主攻MS Dynamics CRM
    IO负载高的来源定位
    ORACL学习笔记 之 分区表
    Linux自动删除n天前日志
    Oracle中NVL2 和NULLIF的用法
    Ubuntu学习笔记之Sqldeveloper安装
    给ubuntu的swap分区增加容量
  • 原文地址:https://www.cnblogs.com/zhouwenyang/p/14335605.html
Copyright © 2011-2022 走看看