计算昨日收益,读取hdfs文件,使用临时表sqlcontext进行计算,结果保存于mysql中。
之前考虑过将结果存储于Hbase中,由于各种原因及问题,在加上数据量真的很小很小,就改成mysql了。
package com.zhongxin import java.text.SimpleDateFormat import java.util.{Calendar, Properties} import com.zhongxin.utils.Constants import org.apache.spark.sql.types._ import org.apache.spark.sql.{Row, SQLContext, SaveMode} import org.apache.spark.{SparkConf, SparkContext} /** * Created by DingYS on 2017/12/15. * 昨日收益 */ object YesterdayInterest { def main(args:Array[String]){ if(null == args || args.length != 1){ System.err.print("请输入参数,格式为<输入数据文件夹路径>") System.exit(-1) } val conf = new SparkConf().setAppName("YesterdayInterest").setMaster("local") val sc = new SparkContext(conf) val sqlcontext = new SQLContext(sc) val filePath = args(0) val userRDD = sc.textFile(filePath,5).map(line => line.split(",")).map(line => Row(line(0).trim,line(1).toInt,line(2).toDouble,line(3).toInt,line(4).toDouble)) val structType = StructType(Array(StructField("userId",StringType,true),StructField("totalOnInvestedShare",IntegerType,true),StructField("bidYield",DoubleType,true),StructField("addShare",IntegerType,true),StructField("addYield",DoubleType,true))) val userInterestDF = sqlcontext.createDataFrame(userRDD,structType) userInterestDF.registerTempTable("t_yesterdayInterest") val yesterday = getYesterday() val resultDF = sqlcontext.sql("select '" + yesterday + "' as day,userId,round(sum((totalOnInvestedShare * bidYield/100 + addShare * addYield/100)/365),2) as yesterdayInterest from t_yesterdayInterest group by userId") val prop = new Properties() prop.put("user", Constants.MYSQL_USER) prop.put("password", Constants.MYSQL_PASSWORD) resultDF.write.mode(SaveMode.Append).jdbc(Constants.MYSQL_URL, "zx_user_yesterday_interest", prop) sc.stop() } // 昨日日期 def getYesterday():String= { var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") var cal: Calendar = Calendar.getInstance() cal.add(Calendar.DATE, -1) var yesterday = dateFormat.format(cal.getTime()) yesterday } }
sqlcontext