zoukankan      html  css  js  c++  java
  • 第二个scala程序

    计算昨日收益,读取hdfs文件,使用临时表sqlcontext进行计算,结果保存于mysql中。

    之前考虑过将结果存储于Hbase中,由于各种原因及问题,在加上数据量真的很小很小,就改成mysql了。

    package com.zhongxin
    
    import java.text.SimpleDateFormat
    import java.util.{Calendar, Properties}
    
    import com.zhongxin.utils.Constants
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.{Row, SQLContext, SaveMode}
    import org.apache.spark.{SparkConf, SparkContext}
    /**
      * Created by DingYS on 2017/12/15.
      * 昨日收益
      */
    object YesterdayInterest {
    
      def main(args:Array[String]){
        if(null == args || args.length != 1){
          System.err.print("请输入参数,格式为<输入数据文件夹路径>")
          System.exit(-1)
        }
        val conf = new SparkConf().setAppName("YesterdayInterest").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlcontext = new SQLContext(sc)
    
        val filePath = args(0)
        val userRDD = sc.textFile(filePath,5).map(line => line.split(",")).map(line => Row(line(0).trim,line(1).toInt,line(2).toDouble,line(3).toInt,line(4).toDouble))
    
        val structType = StructType(Array(StructField("userId",StringType,true),StructField("totalOnInvestedShare",IntegerType,true),StructField("bidYield",DoubleType,true),StructField("addShare",IntegerType,true),StructField("addYield",DoubleType,true)))
    
        val userInterestDF = sqlcontext.createDataFrame(userRDD,structType)
    
        userInterestDF.registerTempTable("t_yesterdayInterest")
    
        val yesterday = getYesterday()
    
        val resultDF = sqlcontext.sql("select '" + yesterday + "' as day,userId,round(sum((totalOnInvestedShare * bidYield/100 + addShare * addYield/100)/365),2) as yesterdayInterest from t_yesterdayInterest group by userId")
    
        val prop = new Properties()
        prop.put("user", Constants.MYSQL_USER)
        prop.put("password", Constants.MYSQL_PASSWORD)
        resultDF.write.mode(SaveMode.Append).jdbc(Constants.MYSQL_URL, "zx_user_yesterday_interest", prop)
        sc.stop()
      }
    
      // 昨日日期
      def getYesterday():String= {
        var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
        var cal: Calendar = Calendar.getInstance()
        cal.add(Calendar.DATE, -1)
        var yesterday = dateFormat.format(cal.getTime())
        yesterday
      }
    }
    

      

    sqlcontext
  • 相关阅读:
    无法通过给定的扩展名确定设备类型
    biuld example_osgviewerGLUT遇到的error Link2019
    OpenGL渲染流水线
    设计模式总目录
    删除thumbs.db是提示正在使用
    COM技术内幕第十章笔记EXE中的服务器
    在高低版本之间互导max文件,以fbx格式为载体尤佳。
    参数值传递的本质
    VS 为什么要检查行尾的一致性?
    vs2005中的GL文件
  • 原文地址:https://www.cnblogs.com/Smilence1024/p/8057084.html
Copyright © 2011-2022 走看看