zoukankan      html  css  js  c++  java
  • sparksql实践_1

    1.需求:

    读取test表中的客群编码以及需要执行的获取客群明细的sql语句
    
    执行获取客群明细sql
    
    将客群编码与客群明细写入结果表中

    2,.数据准备

    drop table sospdm.tmp_yinfei_test;
    create table sospdm.tmp_yinfei_test
    (
     id      string comment '客群id'
    ,exe_sql string comment '执行sql'
    ) comment '客群编码+执行sql'
    row format delimited fields terminated by '	'
    ;
    drop table sospdm.tmp_yinfei_result;
    create table sospdm.tmp_yinfei_result
    (
     id      string comment '客群id'
    ,cust_num string comment '会员编码'
    )comment '结果表,客群编码+客群明细' 
    row format delimited fields terminated by '	'
    ;
    create table sospdm.tmp_yinfei_cust_num
    (
    cust_num string comment '会员编码'
    ) comment '客群明细表';
    insert overwrite table sospdm.tmp_yinfei_cust_num
    select 'a' as cust_num from sospdm.dual union all 
    select 'b' as cust_num from sospdm.dual union all 
    select 'c' as cust_num from sospdm.dual ;
    
    insert overwrite table sospdm.tmp_yinfei_test
    select '1' as id,'select cust_num from sospdm.tmp_yinfei_cust_num' from dual union all 
    select '2' as id,'select cust_num from sospdm.tmp_yinfei_cust_num where cust_num in ('a','b')' from dual 
    ;

    3.编码实现

    package spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql._
    
    object Execute extends App {
    
    
      case class Person(id: String, cust_num: String)
    
      val sparkConf = new SparkConf().setAppName("Read")
      val session = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
      //从表中获取客群以及执行sql语句
      val ex: Array[Row] = session.sql("select id,exe_sql from sospdm.tmp_yinfei_test").rdd.collect()
    
      import session.implicits._
      //结果表数据清除,或者采用分区表
      session.sql("truncate table sospdm.tmp_yinfei_result ")
      //遍历执行sql,与客群id进行笛卡尔关联
      for (row <- ex) {
        val id: String = row.getString(0)
        val ex_sql: String = row.getString(1)
        val id_rdd = session.sparkContext.parallelize(List(id))
        val cust_num = session.sql(ex_sql).rdd
        val idCust: RDD[(String, Row)] = id_rdd cartesian (cust_num)
        val result = idCust.map(person => {
          Person(person._1.toString, person._2.getString(0))
        }).toDF()
        //创建临时表
        result.registerTempTable("tmp_yinfei_cu")
        //结果数据插入结果表
        session.sql("insert into table sospdm.tmp_yinfei_result select id,cust_num from tmp_yinfei_cu")
      }
    }

    4.结果展示

  • 相关阅读:
    Android 中的code sign
    iOS 中的Certificate,Provisioning Profile 的一些注意 (不断完善中)
    xcode 和 android studio中在Mac系统下的自动对齐快捷键
    iOS block 声明时和定义时的不同格式
    iOS 和 Android 中的后台运行问题
    Android 阅读Tasks and Back Stack文章后的重点摘抄
    Android 中PendingIntent---附带解决AlarmManager重复加入问题
    Android 中获得notification的发出时间
    iOS 关于Layer的疑问
    iOS的 context 和Android 中的 canvas
  • 原文地址:https://www.cnblogs.com/yin-fei/p/10884683.html
Copyright © 2011-2022 走看看