zoukankan      html  css  js  c++  java
  • spark 实现动态日期读取

    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by songcl on 2016/7/15.
      */
    object day_uv {
      def main(args: Array[String]): Unit = {
        /**
          * Created by songcl on 2016/7/5.
          */
        val conf = new SparkConf().setAppName("Simple Application").setMaster("spark://10.0.58.21:7077")
        val sc = new SparkContext(conf)
        val url2 = "jdbc:mysql://rds3dabp9v2v7v596tai.mysql.rds.aliyuncs.com/r2d2?user=r2d2_admin&password=Vj0kHdve3"
        val format = new java.text.SimpleDateFormat("yyyyMMdd")
        val dat01 = format.format(new java.util.Date().getTime() - 1 * 24 * 60 * 60 * 1000)
        val dat02 = format.format(new java.util.Date().getTime() - 0 * 24 * 60 * 60 * 1000)
        val dat03 = format.format(new java.util.Date().getTime() - 2 * 24 * 60 * 60 * 1000)
    
        val format2 = new java.text.SimpleDateFormat("yyyy-MM-dd")
        val dat = format2.format(new java.util.Date().getTime() - 1 * 24 * 60 * 60 * 1000)
    
        val log01 = sc.textFile("hdfs://10.0.58.21:9000/data/logstash."+dat01+".log")
        val log02 = sc.textFile("hdfs://10.0.58.21:9000/data/logstash."+dat02+".log")
        val log03 = sc.textFile("hdfs://10.0.58.21:9000/data/logstash."+dat03+".log")
        val  log2=log01 union log02 union log03
        val log=log2.filter(line=>line.contains(dat))
        val rowRDD = log2.map(line => (line.split(""message":"").last.split(" ").head.trim(), line.split("account: ").last.split(", args:").head)).filter({ case (k, v) => k==dat &&v.length()==8 && !k.contains("TypeError:") }).distinct()
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        import sqlContext.implicits._
        val df = rowRDD.toDF("date", "No")
        //val dfed = df.filter(df("created") < dat1 && df("created") > dat2)
       // val dfed = df.filter(df("date") >= dat)
        val ed = df.registerTempTable("kv")
    
        val q = sqlContext.sql("select kv.date,count(distinct kv.No) from kv  group  by kv.date")
        import sqlContext.implicits._
        val df2 = q.toDF("stat_time", "uv")
        // k.insertIntoJDBC(url2, "utest", false)
      //  df2.insertIntoJDBC(url2, "userlog", false)
        import sqlContext.implicits._
       /// val df2 = q.toDF("stat_time", "uv")
         df2.insertIntoJDBC(url2, "day_uv", false)
      }
    }
  • 相关阅读:
    J2EE规范标准
    怎样用Google APIs和Google的应用系统进行集成(4)----获得Access Token以通过一些Google APIs的OAuth2认证
    [BestCoder Round #3] hdu 4908 BestCoder Sequence (计数)
    大数据存储之分布式文件系统(一)
    List与array的相互转换
    TRIZ系列-创新原理-31-多孔材料原理
    关于app.FragmentManager和v4包的FragmentPagerAdapter冲突
    POJ 1163 The Triangle
    ChinaVis2015 第一天会议
    JSON初入门
  • 原文地址:https://www.cnblogs.com/canyangfeixue/p/5678498.html
Copyright © 2011-2022 走看看