zoukankan      html  css  js  c++  java
  • sparksql工程小记

      最近做一个oracle项目迁移工作,跟着spark架构师学着做,进行一些方法的总结。

      1、首先,创建SparkSession对象(老版本为sparkContext)

      val session = SparkSession.builder().appName("app1").getOrCreate()

      2、数据的更新时间配置表,选用mysql,就是说每次结果数据计算写入mysql后,还会将此次数据的更新时间写入数据配置表。 那么在代码里,需要创建配置表的case class,配置与构造数据库schema信息,url,用户名密码等,随后根据配置表中的不同app进行数据的过滤。

      val appId = "1"

      case class DBInformation(url:Stirng,schema:String,user:String,passwd:String)

      val mysqlDB = DBInformation("jdbc:mysql://....",schema,user,passowrd)

      val tableName = mysqlDB.schema + "." + name

      val props = new Properties()

      props.setProperty("user",mysqlDB.user)

      props.setProperty("password",mysqlDB.passwd)

      props.setProperty(JDBCOptions.JDBC_DRIVER_CLASS,"com.mysql.jdbc.Driver")

      val record = session.read.jdbc(mysqlDB.url,tableName,props).filter(row => row.getAs[Int]("app_id") == appId).take(1)

      //第一次写入,木有数据

      if(0 == record.size){

        DBInfoMation(null,null,null)

      }else{

        DBInfoMation(record(0).getTimestmap(1),recode(0).getTimestamp(2),recode(0)..getTimestamp(3))  

      3、注册UDF,由于原来是用oracle的语法,现如今转为sparksql,需要注册一些UDF,来兼容原有oracle的函数

      def registerUDF(session:SparkSession) : Unit = {

        session.udf.register("UDF",(value : String,modifieds:Array[String) => {

          val filter = modifieds.filter(_!=null)

          if(!filter.isEmpty){

            filter.max

          }else{

            null

          }

         })

       {

      4、很多计算是需要过往的历史数据的,在第一次初始化的时候,先对历史数据进行缓存。这里有个知识点,会将一直计算的同步数据进行checkPoint落地磁盘,如果发现历史时间在同步时间之后,则加载历史数据,否则就加载同步数据。

      val (updateTime,initData) = if(historyTime.after(syncTime)){

        (historyTime,initFromHistory(tableName))

      } else {

        (syncTime,initFromCheckPoint(syncTime))

      }

      //记录schema

      schema = initData.schema

      //baseData为缓存在内存的数据,并根据数据量进行repartition

      baseData = initData.repartition(numPartitions,_partitionColumns.map(new Column()):_*).rdd.persisit(storageLevel)

      //触发action动作

      baseData.foreach(_=>Unit)

      5、有一种情况,下游三个表要关联生成一张大表,这三张表的数据来源于消息中间件中的三个topic,但是数据可能不是同时到来,那么就需要将历史加载的大表拆根据ID拆分为三个小表,然后逐个append到三个小表上,随后再根据ID关联起来,再组成最终表。

      val table1 = new createUpdatingTable(session,"tableName1",topicConf,numPartitons,...)

      val table2 = new createUpdatingTable (session,"tableName2",topicConf1,numPartitions,...)

      val table3 = new createUpdatingTable(session,"tableName3","topicConf2,numPartitions,...)

      val mergeBaseTable = (session,"mergeTableName",Array(table1,table2,table3),finallyColumn,finallyPartitions...)

      mergeBaseTable.updateAndGetData(Some(genDataFilter(currentTime)))

      //三表拆分与合并

      val tmpPartitionKey = "pd_code"

      if(baseData != null) {

        val oldData = getOldData(baseData,keyDF.rdd,tmpPartitionKey)

        oldDf = session.createDataFrame(oldData,schema)

        .repartition(numPartitions,new Column(tmpPartitionKey))

        .persist(storageLevel)

      }

      val table1 = updateShardTable(oldDf,inDfs(0)...).sparksession.createDataFrame(data,schema)

      val table2 = ....

      val table3 = ....

      

      6、三表key进行合并,通过sql进行三来源表合并

      val keySet = keys.collect()

      val broadcastKeys = session.sparkContext.broadCast(keySet)

      baseData.mapPartitions({iter =>

        val set = broadcastKey.value.toSet

        iter.filter(row=>set.contains(row.getAs[Any](keyCol)))

      },true)

      val sql ="select a.column,b.column,c.column.... from table1 a left join table2 b on a.pd_code = b.pd_code......

      val finallyTable = session.sql(sql)

      7、从历史数据中筛选出此次需要更新的数据(通过ID进行过滤),随后将新数据进行append

      val new Data = baseData.zipPartitions(updateData,true){case(liter,riter)=>

        val rset = new mutable.HashSet[Any]

        for(row <- riter){

          rset.add(row.getAs[Any](keyCol))

        }

        liter.filter(row=>!rset.contains(row.getAs[Any](keyCol))))

        }.zipPartitions(updateData,true){case (liter,riter)=>

          liter++riter

        }.persisit(storageLevel)

      

  • 相关阅读:
    C++11 二叉堆
    OpenCV --- 实现两幅图像并排合并(ROI)
    OpenCV --- 修改图像的对比度、亮度 、RGB转Gray图像、修改图像的尺寸
    Opencv --- 图像像素遍历的各种方法
    Ubuntu系统的安装(虚拟机) 并配置C/C++编译器
    在Ubuntu下编译安装nginx
    【OpenCV3】threshold()函数详解
    MFC 剪切板的使用、线程介绍
    C++基础知识 基类指针、虚函数、多态性、纯虚函数、虚析构
    【OpenCV3】cvRound()、cvFloor()、cvCeil()函数详解
  • 原文地址:https://www.cnblogs.com/yangsy0915/p/6789689.html
Copyright © 2011-2022 走看看