zoukankan      html  css  js  c++  java
  • 【spark】sparkSQL自定义批处理输入输出

    背景

    sparkSQL 自带了很多输入和输出方式

    但很多时候需要添加其他输入输出方式例如 hbase、socket、websocket、excel等

    输出

    以websocket输出为例

    最终消息发送的代码:

    df.write.format("cn.zwy.websocket")
          .option("uri", "ws://localhost:8888")
          .save()

    spark 会在format填写的包 cn.zwy.websocket 下寻找DefaultSource类。因此在该包下创建一个DefaultSource类

    package cn.zwy.websocket 
    
    import cn.zwy.common.JsonUtil
    import cn.zwy.WebSocketClient
    import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, Filter}
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
    
    import java.net.URI
    import scala.collection.JavaConverters._
    
    class DefaultSource extends CreatableRelationProvider with DataSourceRegister{
    
      override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
    
        val uri = parameters("uri")
        data.foreachPartition(partition=>{
          val client = new WebSocketClient(new URI(uri))
          client.connectBlocking()
          partition.foreach(row=>{
            try {
              val schema = row.schema
              val dataMap = row.getValuesMap(schema.fieldNames).asJava
              client.send(JsonUtil.Encode(dataMap))
            } catch {
              case _:Exception =>
            }
          })
          client.close()
        })
    
        new BaseRelation {
          override def sqlContext: SQLContext = unsupportedException
          override def schema: StructType = unsupportedException
          override def needConversion: Boolean = unsupportedException
          override def sizeInBytes: Long = unsupportedException
          override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException
          private def unsupportedException =
            throw new UnsupportedOperationException("BaseRelation from web socket write " +
              "operation is not usable.")
        }
      }
    
      override def shortName(): String = "websocket"
    }

    写出文件需要实现CreatableRelationProvider接口

    而对应的读取数据的接口则是 RelationProvider接口

    至于最后的返回

    new BaseRelation {
          override def sqlContext: SQLContext = unsupportedException
          override def schema: StructType = unsupportedException
          override def needConversion: Boolean = unsupportedException
          override def sizeInBytes: Long = unsupportedException
          override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException
          private def unsupportedException =
            throw new UnsupportedOperationException("BaseRelation from web socket write " +
              "operation is not usable.")
        }

    没细品,抄的kafka的代码。github上某些项目最后返回的RelationProvider的createRelation方法,貌似是写完了之后再读取一遍。

    钦此!

  • 相关阅读:
    Java中使用Oracle的客户端 load data和sqlldr命令执行数据导入到数据库中
    迁移mysql数据到oracle上
    SQL Developer 警告--无法安装某些模块
    Oracle SQLDeveloper ORA-01017 invalid username/password;logon denied (密码丢失解决方案)
    解决Java连接MySQL存储过程返回参数值为乱码问题
    Tensorflow BatchNormalization详解:2_使用tf.layers高级函数来构建神经网络
    Tensorflow BatchNormalization详解:1_原理及细节
    随机切分csv训练集和测试集
    tf.session.run()单函数运行和多函数运行区别
    tf.train.batch的偶尔乱序问题
  • 原文地址:https://www.cnblogs.com/zhouwenyang/p/14335605.html
Copyright © 2011-2022 走看看