zoukankan      html  css  js  c++  java
  • sparksql实践_2

    package spark
    
    import java.text.SimpleDateFormat
    import java.util.{Calendar, Date}
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    
    
    object Execute {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("Execute")
        val session = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
        val time = args(0) //传入参数  2019052212
        val dateFormatTime = new SimpleDateFormat("yyyyMMddHH")
        val dateFormatDay = new SimpleDateFormat("yyyyMMdd")
        val dateFormatHour = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        val cal = Calendar.getInstance()
        cal.setTime(dateFormatTime.parse(time))
        val partitionDate = dateFormatDay.format(cal.getTime)
        val partitionHour = cal.get(Calendar.HOUR_OF_DAY).toString.length() match {
          case 1 => "0" + cal.get(Calendar.HOUR_OF_DAY).toString
          case _ => cal.get(Calendar.HOUR_OF_DAY).toString
        }
        val statisDatePre = dateFormatHour.format(dateFormatTime.parse(time))
        cal.setTime(new Date(dateFormatTime.parse(time).getTime + 3600000))
        val statisDateNow = dateFormatHour.format(cal.getTime)
        val handle: Array[Row] = session.sql("select handle_condition FROM sospdm.mysql_to_hive_m_guest_screen_info WHERE create_time >= '" + statisDatePre + "' and create_time <= '" + statisDateNow + "' and source = 'SH_10001'").rdd.collect()
    
        for (row <- handle) {
          val ex_sql = row.getString(0)
          val idCust = session.sql(ex_sql)
          idCust.registerTempTable("tmp_m_guest_screen_info")
          //结果数据插入结果表
          session.sql("insert overwrite table sospdm.tdm_wit_customer_group_detail partition (statis_date= " + partitionDate + " ,statis_hour= " + partitionHour + " ) select cust_group_cd as cid,cust_num from tmp_m_guest_screen_info")
          session.sql("insert overwrite table sospdm.tdm_wit_customer_group_code partition (statis_date= " + partitionDate + " ,statis_hour= " + partitionHour + " ) select cust_group_cd as cid from tmp_m_guest_screen_info group by cust_group_cd")
        }
      }
    }
  • 相关阅读:
    dns解析后ping的居然不是自己的ip
    Ubuntu修改默认使用的bash
    安装 libbpg
    libnccl安装
    安装opencv
    tcpdump使用
    jQuery类操作
    jQuery对象和DOM对象的相互转换
    jQuery入口函数
    什么是外边距重叠?重叠的结果是什么?
  • 原文地址:https://www.cnblogs.com/yin-fei/p/10910709.html
Copyright © 2011-2022 走看看