zoukankan      html  css  js  c++  java
  • 第二个scala程序

    计算昨日收益,读取hdfs文件,使用临时表sqlcontext进行计算,结果保存于mysql中。

    之前考虑过将结果存储于Hbase中,由于各种原因及问题,在加上数据量真的很小很小,就改成mysql了。

    package com.zhongxin
    
    import java.text.SimpleDateFormat
    import java.util.{Calendar, Properties}
    
    import com.zhongxin.utils.Constants
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.{Row, SQLContext, SaveMode}
    import org.apache.spark.{SparkConf, SparkContext}
    /**
      * Created by DingYS on 2017/12/15.
      * 昨日收益
      */
    object YesterdayInterest {
    
      def main(args:Array[String]){
        if(null == args || args.length != 1){
          System.err.print("请输入参数,格式为<输入数据文件夹路径>")
          System.exit(-1)
        }
        val conf = new SparkConf().setAppName("YesterdayInterest").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlcontext = new SQLContext(sc)
    
        val filePath = args(0)
        val userRDD = sc.textFile(filePath,5).map(line => line.split(",")).map(line => Row(line(0).trim,line(1).toInt,line(2).toDouble,line(3).toInt,line(4).toDouble))
    
        val structType = StructType(Array(StructField("userId",StringType,true),StructField("totalOnInvestedShare",IntegerType,true),StructField("bidYield",DoubleType,true),StructField("addShare",IntegerType,true),StructField("addYield",DoubleType,true)))
    
        val userInterestDF = sqlcontext.createDataFrame(userRDD,structType)
    
        userInterestDF.registerTempTable("t_yesterdayInterest")
    
        val yesterday = getYesterday()
    
        val resultDF = sqlcontext.sql("select '" + yesterday + "' as day,userId,round(sum((totalOnInvestedShare * bidYield/100 + addShare * addYield/100)/365),2) as yesterdayInterest from t_yesterdayInterest group by userId")
    
        val prop = new Properties()
        prop.put("user", Constants.MYSQL_USER)
        prop.put("password", Constants.MYSQL_PASSWORD)
        resultDF.write.mode(SaveMode.Append).jdbc(Constants.MYSQL_URL, "zx_user_yesterday_interest", prop)
        sc.stop()
      }
    
      // 昨日日期
      def getYesterday():String= {
        var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
        var cal: Calendar = Calendar.getInstance()
        cal.add(Calendar.DATE, -1)
        var yesterday = dateFormat.format(cal.getTime())
        yesterday
      }
    }
    

      

    sqlcontext
  • 相关阅读:
    centos7修改网卡名称
    xtrabackup备份恢复过程
    centos 设置删除提示
    cetos6 安装samba共享文件夹
    centos 安装cacti监控
    centos6 搭建hdwiki
    centos6.8安装superctl 后台管理工具
    centos6.5 安装ansible,管理多台服务器
    CreateParams作用及重写
    c# 获取当前应用程序的路径
  • 原文地址:https://www.cnblogs.com/Smilence1024/p/8057084.html
Copyright © 2011-2022 走看看