zoukankan      html  css  js  c++  java
  • spark 实现动态日期读取

    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by songcl on 2016/7/15.
      */
    object day_uv {
      def main(args: Array[String]): Unit = {
        /**
          * Created by songcl on 2016/7/5.
          */
        val conf = new SparkConf().setAppName("Simple Application").setMaster("spark://10.0.58.21:7077")
        val sc = new SparkContext(conf)
        val url2 = "jdbc:mysql://rds3dabp9v2v7v596tai.mysql.rds.aliyuncs.com/r2d2?user=r2d2_admin&password=Vj0kHdve3"
        val format = new java.text.SimpleDateFormat("yyyyMMdd")
        val dat01 = format.format(new java.util.Date().getTime() - 1 * 24 * 60 * 60 * 1000)
        val dat02 = format.format(new java.util.Date().getTime() - 0 * 24 * 60 * 60 * 1000)
        val dat03 = format.format(new java.util.Date().getTime() - 2 * 24 * 60 * 60 * 1000)
    
        val format2 = new java.text.SimpleDateFormat("yyyy-MM-dd")
        val dat = format2.format(new java.util.Date().getTime() - 1 * 24 * 60 * 60 * 1000)
    
        val log01 = sc.textFile("hdfs://10.0.58.21:9000/data/logstash."+dat01+".log")
        val log02 = sc.textFile("hdfs://10.0.58.21:9000/data/logstash."+dat02+".log")
        val log03 = sc.textFile("hdfs://10.0.58.21:9000/data/logstash."+dat03+".log")
        val  log2=log01 union log02 union log03
        val log=log2.filter(line=>line.contains(dat))
        val rowRDD = log2.map(line => (line.split(""message":"").last.split(" ").head.trim(), line.split("account: ").last.split(", args:").head)).filter({ case (k, v) => k==dat &&v.length()==8 && !k.contains("TypeError:") }).distinct()
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        import sqlContext.implicits._
        val df = rowRDD.toDF("date", "No")
        //val dfed = df.filter(df("created") < dat1 && df("created") > dat2)
       // val dfed = df.filter(df("date") >= dat)
        val ed = df.registerTempTable("kv")
    
        val q = sqlContext.sql("select kv.date,count(distinct kv.No) from kv  group  by kv.date")
        import sqlContext.implicits._
        val df2 = q.toDF("stat_time", "uv")
        // k.insertIntoJDBC(url2, "utest", false)
      //  df2.insertIntoJDBC(url2, "userlog", false)
        import sqlContext.implicits._
       /// val df2 = q.toDF("stat_time", "uv")
         df2.insertIntoJDBC(url2, "day_uv", false)
      }
    }
  • 相关阅读:
    [原]Django调试工具--django-debug-toolbar
    [原]Redis详细配置介绍
    [转]国内外三个不同领域巨头分享的Redis实战经验及使用场景
    [原]Redis使用场景及使用经验
    [原]打造Python开发环境之Python环境
    [原]打造Python开发环境之初篇
    Mac关闭ciscovpn客户端的开机启动
    HttpResponse render render_to_response 三者的区别
    PyCharm粘贴到OneNote 2013保留代码格式的解决方案
    谁是python上最快的xlsx writer
  • 原文地址:https://www.cnblogs.com/canyangfeixue/p/5678498.html
Copyright © 2011-2022 走看看