zoukankan      html  css  js  c++  java
  • sparkSQL实际应用

     提交代码包

    /usr/local/spark/bin$ spark-submit --class "getkv" /data/chun/sparktes.jar

    1、查询KV

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    object kv{
      def main(args: Array[String]) {
    
        val conf = new SparkConf()
        val sc = new SparkContext(conf)
        val log=sc.textFile("hdfs://10.0.58.21:9000/falcon/2016/*/*/*.log")
        val rowRDD=log.map(line=>(line.split(""message":"").last.split(" ").head.trim(),line.split("account: ").last.split(", args:").head))
        val k=rowRDD.filter({case(k,v) =>k.length==10 && !k.contains("TypeError:")}).filter({case(k,v)=>v.length==8})
        k.repartition(1).saveAsTextFile("file:////data/kv")
      }
    }

    2、关联MySQL

     #  spark-shell --driver-class-path /usr/local/spark/mysql/mysql.jar
    val log=sc.textFile("hdfs://10.0.58.21:9000/falcon/2016/*/*/*.log") val rowRDD=log.map(line=>(line.split(""message":"").last.split(" ").head.trim(),line.split("account: ").last.split(", args:").head)) val k=rowRDD.filter({case(k,v) =>k.length==10 && !k.contains("TypeError:")}).filter({case(k,v)=>v.length==7}) val s=k.toDF("date","No").registerTempTable("kv") val role=sqlContext.jdbc("jdbc:mysql://rdssw603u1t68figaia7.mysql.rds.aliyuncs.com:3306/falcon?user=wy_app&password=V0tkEIve2","role") val job=sqlContext.jdbc("jdbc:mysql://rdssw603u1t68figaia7.mysql.rds.aliyuncs.com:3306/falcon?user=wy_app&password=V0tkEIve2","job") val staff_and_job=sqlContext.jdbc("jdbc:mysql://rdssw603u1t68figaia7.mysql.rds.aliyuncs.com:3306/falcon?user=wy_app&password=V0tkEIve2","staff_and_job") val project=sqlContext.jdbc("jdbc:mysql://rdssw603u1t68figaia7.mysql.rds.aliyuncs.com:3306/falcon?user=wy_app&password=V0tkEIve2","project") val ro=role.toDF().registerTempTable("role") val jo=job.toDF().registerTempTable("job") val s=staff_and_job.toDF().registerTempTable("staff_and_job") val p=project.toDF().registerTempTable("project") val q=sqlContext.sql("SELECT project.`name`,project.`code`,staff_and_job .`staff_id` FROM project LEFT JOIN job ON project.`code`=job.`project_code` LEFT JOIN role ON job.`role_code`=role.`code` LEFT JOIN staff_and_job ON job.`id`=staff_and_job .`job_id` WHERE project.`is_spread`='1' AND role.`name`='人事专员' AND staff_and_job .`staff_id` IS NOT NULL") val q1=q.toDF("name","code","No").registerTempTable("p") val ed=sqlContext.sql("select p.name,count(distinct kv.No) from p join kv on p.No=kv.No group By p.name")

     3 、项目关联活跃用户数

    val log=sc.textFile("hdfs://10.0.58.21:9000/falcon/2016/*/*/*.log")
    val rowRDD=log.map(line=>(line.split(""message":"").last.split(" ").head.trim(),line.split("account: ").last.split(", args:").head))
    
    val k=rowRDD.filter({case(k,v) =>k.length==10 && !k.contains("TypeError:")}).filter({case(k,v)=>v.length==8})
    
    val s=k.toDF("date","No").registerTempTable("kv")

    
    
    val user=sqlContext.jdbc("jdbc:mysql://rdssw603u1t68figaia7.mysql.rds.aliyuncs.com:3306/falcon?user=wy_app&password=V0tkEIve2","user")  
    val house=sqlContext.jdbc("jdbc:mysql://rdssw603u1t68figaia7.mysql.rds.aliyuncs.com:3306/falcon?user=wy_app&password=V0tkEIve2","house")
    val project=sqlContext.jdbc("jdbc:mysql://rdssw603u1t68figaia7.mysql.rds.aliyuncs.com:3306/falcon?user=wy_app&password=V0tkEIve2","project")
    val us=user.toDF().registerTempTable("user")

      val h=house.toDF().registerTempTable("house")

      val pr=project.toDF().registerTempTable("project")


    val q1
    =sqlContext.sql("select project.`name`,kv.date,count(distinct kv.No) from user join kv on kv.No=user.id left join house on user.`main_house_code`=house.`code` left join project on house.`project_code`=project.`code` where kv.date>='2016-05-01' and kv.date<='2016-05-16' and project.name='苏州金色里程' group by project.name,kv.date")
    val q1=sqlContext.sql("select project.`name`,count(distinct kv.No) from user  join kv on kv.No=user.id left join house on user.`main_house_code`=house.`code` left join project on house.`project_code`=project.`code` where kv.date>='2016-05-01'  and kv.date<='2016-05-16'  and project.name='苏州金色里程' group by project.name,kv.date")

    q1.repartition(1).rdd.saveAsTextFile("file:////data/kvv")
     

    4、活动页面浏览


    spark-shell --driver-class-path /usr/local/spark/mysql/mysql.jar
    val log=sc.textFile("hdfs://10.0.58.21:9000/falcon/2016/05/16/*.log")

    val log=sc.textFile("hdfs://10.0.58.21:9000/user/yejin/*.log")

    val k=f.filter({case(k,v) =>k.length==10 && !k.contains("TypeError:")}).filter({case(k,v)=>v.length==8})
    
    val s=k.toDF("date","No").registerTempTable("kv")

    5、员工关联岗位

    val log=sc.textFile("hdfs://10.0.58.21:9000/user/yejin/*.log")
    
    val rowRDD=log.map(line=>(line.split(""message":"").last.split(",").head.trim(),line.split("account: ").last.split(", args:").head))
    
    val k=rowRDD.filter({case(k,v) =>k.length==10 && !k.contains("TypeError:")}).filter({case(k,v)=>v.length==7})
    val l=k.toDF("date","No").registerTempTable("kv")

    val staff_and_job =sqlContext.jdbc("jdbc:mysql://rdssw603u1t68figaia7.mysql.rds.aliyuncs.com:3306/falcon?user=wy_app&password=V0tkEIve2","staff_and_job")
    val job =sqlContext.jdbc("jdbc:mysql://rdssw603u1t68figaia7.mysql.rds.aliyuncs.com:3306/falcon?user=wy_app&password=V0tkEIve2","job")
    val role =sqlContext.jdbc("jdbc:mysql://rdssw603u1t68figaia7.mysql.rds.aliyuncs.com:3306/falcon?user=wy_app&password=V0tkEIve2","role")
    val sj=staff_and_job.toDF().registerTempTable("staff_and_job")
    val jb=job.toDF().registerTempTable("job")
    val rl=role.toDF().registerTempTable("role")
    val ed=sqlContext.sql("SELECT staff_and_job.`staff_id`,role.`name` FROM staff_and_job LEFT JOIN job ON staff_and_job.`job_id`=job.`id` LEFT JOIN role ON job.`role_code`=role.`code`")
    val xy=ed.toDF("No","Name").registerTempTable("T")
    val r=sqlContext.sql("select T.Name,count(distinct kv.No) from T join kv on kv.No=T.No where kv.date='2016-05-09' group by T.Name")
    r.repartition(1).rdd.saveAsTextFile("")

  • 相关阅读:
    用vuex实现购物车功能
    Vue引入Jquery和Bootstrap
    emWin使用外部SRAM的方法
    (四)u-boot2013.01.01 for TQ210:《mkconfig分析》
    (三)u-boot2013.01.01 for TQ210:《mkconfig分析》
    Vmware出现报错The VMware Authorization Service is not running.之后无法上网解决
    Uva 10596
    Uva 10305
    要检测两个C文件的代码的抄袭情况
    MFC简易画图
  • 原文地址:https://www.cnblogs.com/canyangfeixue/p/5488260.html
Copyright © 2011-2022 走看看