zoukankan      html  css  js  c++  java
  • Spark使用实例

    1.介绍

    Spark是基于Hadoop的大数据处理框架,相比较MapReduce,Spark对数据的处理是在本地内存中进行,中间数据不需要落地,因此速度有很大的提升。而MapReduce在map阶段和Reduce阶段后都需要文件落地,对于连续的数据处理,就需要写多个MapReduce Job接力执行。
    最近分析用户查询日志提取共现查询,流程如下:a.先获得<uid, query>对;b.合并同一个uid的queries,组成共现query对<query1, query2>, <query3, query4>等;c.计算共现query对的出现次数,<query1, query2> -> 5,<query3, query4> -> 10等,最后输出结果。
    写MapReduce就需要2个Job,第一个Job获得每个uid下的共现query,第二个Job读取第一个Job的输出,合并共现query。我们来看下Spark怎么简单的实现这个流程的。

    2.实例

    Spark框架是用Scala编写的,Scala是一个函数式编程语言,函数可以作为输入和输出,可以用连续接力的计算单元让计算结果不断渐进变化,逐层推导。Scala尽力避免程序状态和可变对象。
    我们主要来体验下spark和scala的接力处理,与业务相关的代码就删掉了,精华部分是processQueryLog函数。
    object SimilarQuery {
      val QUERY_LOG_PATH: String = "/search_log/"
      val SIMILAR_QUERY_PATH: String = "/similar_query/"
    
      def main(args: Array[String]) = {
        val conf = new SparkConf().setAppName("MigameSimilarQuery")
        val sc = new SparkContext(conf)
    
        //输入/输出路径
        val input = QUERY_LOG_PATH
        val output = SIMILAR_QUERY_PATH
    
        //获取共现查询
        val lines = sc.sequenceFile[BytesWritable, BytesWritable](input)
        val similarQ = processQueryLog(lines)
    
        val hadoopCfg = new Configuration(sc.hadoopConfiguration)
        val fs: FileSystem = FileSystem.get(hadoopCfg)
        fs.delete(new Path(output), true)
    
        similarQ.saveAsTextFile(output)
        sc.stop()
      }
    
      //获取一天的共现query
      def processQueryLog(rdd: RDD[(BytesWritable, BytesWritable)]) = {
        rdd.map(log => {
          //源文件是thrift序列化后的scribe日志,里面记录了一次用户的查询(uid, query, time)
          val deserializer: TDeserializer = new TDeserializer(new TCompactProtocol.Factory)
          //找到一个用户今天的查询
          val searchLog: SearchLog = new SearchLog
          deserializer.deserialize(searchLog, log._2.copyBytes())
          (searchLog.getUid, (searchLog.getCommon.getTime, query))
        }).filter(!_._1.isEmpty).groupByKey().flatMap {
          case (uid, iter) => {
            //处理共现查询
            val queries = iter.toList
            queries.sortBy(_._1)
            //lambda运算,返回list,元素是一个元组
            val relateQueries = for (i <- 0 to queries.length - 2;
                                     j <- i + 1 to queries.length - 1) yield {
                (queries(j)._2, queries(i)._2)
            }
            //list过滤和去重,执行map过程,打散输出
            relateQueries.filter(_._1 != null).distinct.map(t => (t, 1))
          }
        }.reduceByKey(_ + _).map(query => {
          query._1._1 + "	" + query._1._2 + "	" + query._2
        })
      }
    }

    解析log => uid/query对 => filter  => 同一个uid的query merge =>共现query => 相同的query对计数 => 格式化输出,一系列的流程瀑布式的推进。

  • 相关阅读:
    Java Web 网络留言板2 JDBC数据源 (连接池技术)
    Java Web 网络留言板3 CommonsDbUtils
    Java Web ConnectionPool (连接池技术)
    Java Web 网络留言板
    Java Web JDBC数据源
    Java Web CommonsUtils (数据库连接方法)
    Servlet 起源
    Hibernate EntityManager
    Hibernate Annotation (Hibernate 注解)
    wpf控件设计时支持(1)
  • 原文地址:https://www.cnblogs.com/whuqin/p/4981955.html
Copyright © 2011-2022 走看看