1.介绍
Spark是基于Hadoop的大数据处理框架,相比较MapReduce,Spark对数据的处理是在本地内存中进行,中间数据不需要落地,因此速度有很大的提升。而MapReduce在map阶段和Reduce阶段后都需要文件落地,对于连续的数据处理,就需要写多个MapReduce Job接力执行。
最近分析用户查询日志提取共现查询,流程如下:a.先获得<uid, query>对;b.合并同一个uid的queries,组成共现query对<query1, query2>, <query3, query4>等;c.计算共现query对的出现次数,<query1, query2> -> 5,<query3, query4> -> 10等,最后输出结果。
写MapReduce就需要2个Job,第一个Job获得每个uid下的共现query,第二个Job读取第一个Job的输出,合并共现query。我们来看下Spark怎么简单的实现这个流程的。
2.实例
Spark框架是用Scala编写的,Scala是一个函数式编程语言,函数可以作为输入和输出,可以用连续接力的计算单元让计算结果不断渐进变化,逐层推导。Scala尽力避免程序状态和可变对象。
我们主要来体验下spark和scala的接力处理,与业务相关的代码就删掉了,精华部分是processQueryLog函数。
object SimilarQuery { val QUERY_LOG_PATH: String = "/search_log/" val SIMILAR_QUERY_PATH: String = "/similar_query/" def main(args: Array[String]) = { val conf = new SparkConf().setAppName("MigameSimilarQuery") val sc = new SparkContext(conf) //输入/输出路径 val input = QUERY_LOG_PATH val output = SIMILAR_QUERY_PATH //获取共现查询 val lines = sc.sequenceFile[BytesWritable, BytesWritable](input) val similarQ = processQueryLog(lines) val hadoopCfg = new Configuration(sc.hadoopConfiguration) val fs: FileSystem = FileSystem.get(hadoopCfg) fs.delete(new Path(output), true) similarQ.saveAsTextFile(output) sc.stop() } //获取一天的共现query def processQueryLog(rdd: RDD[(BytesWritable, BytesWritable)]) = { rdd.map(log => { //源文件是thrift序列化后的scribe日志,里面记录了一次用户的查询(uid, query, time) val deserializer: TDeserializer = new TDeserializer(new TCompactProtocol.Factory) //找到一个用户今天的查询 val searchLog: SearchLog = new SearchLog deserializer.deserialize(searchLog, log._2.copyBytes()) (searchLog.getUid, (searchLog.getCommon.getTime, query)) }).filter(!_._1.isEmpty).groupByKey().flatMap { case (uid, iter) => { //处理共现查询 val queries = iter.toList queries.sortBy(_._1) //lambda运算,返回list,元素是一个元组 val relateQueries = for (i <- 0 to queries.length - 2; j <- i + 1 to queries.length - 1) yield { (queries(j)._2, queries(i)._2) } //list过滤和去重,执行map过程,打散输出 relateQueries.filter(_._1 != null).distinct.map(t => (t, 1)) } }.reduceByKey(_ + _).map(query => { query._1._1 + " " + query._1._2 + " " + query._2 }) } }
解析log => uid/query对 => filter => 同一个uid的query merge =>共现query => 相同的query对计数 => 格式化输出,一系列的流程瀑布式的推进。