zoukankan      html  css  js  c++  java
  • 【spark】sparkSQL自定义批处理输入输出

    背景

    sparkSQL 自带了很多输入和输出方式

    但很多时候需要添加其他输入输出方式例如 hbase、socket、websocket、excel等

    输出

    以websocket输出为例

    最终消息发送的代码:

    df.write.format("cn.zwy.websocket")
          .option("uri", "ws://localhost:8888")
          .save()

    spark 会在format填写的包 cn.zwy.websocket 下寻找DefaultSource类。因此在该包下创建一个DefaultSource类

    package cn.zwy.websocket 
    
    import cn.zwy.common.JsonUtil
    import cn.zwy.WebSocketClient
    import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, Filter}
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
    
    import java.net.URI
    import scala.collection.JavaConverters._
    
    class DefaultSource extends CreatableRelationProvider with DataSourceRegister{
    
      override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
    
        val uri = parameters("uri")
        data.foreachPartition(partition=>{
          val client = new WebSocketClient(new URI(uri))
          client.connectBlocking()
          partition.foreach(row=>{
            try {
              val schema = row.schema
              val dataMap = row.getValuesMap(schema.fieldNames).asJava
              client.send(JsonUtil.Encode(dataMap))
            } catch {
              case _:Exception =>
            }
          })
          client.close()
        })
    
        new BaseRelation {
          override def sqlContext: SQLContext = unsupportedException
          override def schema: StructType = unsupportedException
          override def needConversion: Boolean = unsupportedException
          override def sizeInBytes: Long = unsupportedException
          override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException
          private def unsupportedException =
            throw new UnsupportedOperationException("BaseRelation from web socket write " +
              "operation is not usable.")
        }
      }
    
      override def shortName(): String = "websocket"
    }

    写出文件需要实现CreatableRelationProvider接口

    而对应的读取数据的接口则是 RelationProvider接口

    至于最后的返回

    new BaseRelation {
          override def sqlContext: SQLContext = unsupportedException
          override def schema: StructType = unsupportedException
          override def needConversion: Boolean = unsupportedException
          override def sizeInBytes: Long = unsupportedException
          override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException
          private def unsupportedException =
            throw new UnsupportedOperationException("BaseRelation from web socket write " +
              "operation is not usable.")
        }

    没细品,抄的kafka的代码。github上某些项目最后返回的RelationProvider的createRelation方法,貌似是写完了之后再读取一遍。

    钦此!

  • 相关阅读:
    MSN、易趣、大旗被挂马 用户浏览后感染机器狗病毒 狼人:
    世界最大漏洞数据库发布最新研究报告 狼人:
    五角大楼最昂贵武器发展项目遭到黑客攻击 狼人:
    RSA呼吁厂商“创造性协作” 共同反击网络威胁 狼人:
    RSA2009:云计算服务如何保证安全? 狼人:
    黑客工具可将恶意软件隐藏于.Net框架 狼人:
    RSA安全大会将亮相25款热门安全产品 狼人:
    目录访问共享C#怎么访问共享目录
    代码下行Jquery结合Ajax和Web服务使用三层架构实现无刷新分页
    输出次数HDU2192:MagicBuilding
  • 原文地址:https://www.cnblogs.com/zhouwenyang/p/14335605.html
Copyright © 2011-2022 走看看