zoukankan      html  css  js  c++  java
  • Spark获取某个手机号在某个基站下停留的时间和当前手机所在的位置的案例

    1、业务需求
    在拥有手机号在每个基站处停留时间日志 和 基站信息的 算出某个手机号的(所在基站,停留时间),(当前所在经度,当前所在纬度)

    其中手机连接基站产生的日志信息类似如下:

    18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1
    18611132889,20160327082500,16030401EAFB68F1E3CDF819735E1C66,1
    18688888888,20160327170000,16030401EAFB68F1E3CDF819735E1C66,0
    18611132889,20160327180000,16030401EAFB68F1E3CDF819735E1C66,0
    

    上面的含义表示的是:手机号,时间,基站ID,接入网络的类型(0:unknow,1:3G,2:2G,6:4G)

    基站信息:

    9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6
    CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6
    16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6
    

    上面的含义表示的是:基站ID,经度,纬度,接入网络的类型(0:unknow,1:3G,2:2G,6:4G)

    编写Scale代码:

    	package com.Hive
    	
    	import org.apache.spark.rdd.RDD
    	import org.apache.spark.{SparkConf, SparkContext}
    	
    	object FD {
    	
    	  def main(args: Array[String]): Unit = {
    	
    	    val conf = new SparkConf().setAppName("FD").setMaster("local[2]")
    	    val sc  = new SparkContext(conf)
    	
    	    //1.读取数据文件
    	
    	    val user =sc.textFile("src/main/data/log/")//用户数据
    	    val base  = sc.textFile("src/main/data/base_info.txt")//基站数据
    	
    	
    	    //2.数据清洗工作,数据维度提取
    	//    用户数据清洗
    	    val splited = user.map(line =>{
    	
    	    val fields = line.split(",")
    	    val phone = fields(0)
    	
    	    val base = fields(2)
    	    val envet = fields(3).toInt
    	
    	    val time  = {
    	      if (envet == 1){
    	       -fields(1).toLong//赋值-
    	      }else{
    	        fields(1).toLong//正值+
    	      }
    	    }
    	
    	  ((phone,base),time)
    	})
    	
    	//   splited.collect().foreach(println(_))
    	
    	//    基站数据清洗
    	    val alcsplited = base.map(line =>{
    	      val fields = line.split(",")
    	      val id = fields(0)
    	      val x = fields(1)
    	      val y = fields(2)
    	      (id,(x,y))
    	    })
    	
    	   // splited.collect().foreach(println(_))
    	
    	    //3.统计每个用户在每个基站中停留的时间
    	
    	    val reducted = splited.reduceByKey(_+_)
    	
    	   // reducted.collect().foreach(println(_))
    	
    	    //((phone,base),time)
    	    val pmt = reducted.map(x=>{
    	
    	      //(基站ID,(手机号,时间))
    	      //x._1对应的是元组((mobile,lac),time)中的(mobile,lac)
    	      //x._2对应的是元组((mobile,lac),time)中的time
    	      ((x._1._2),(x._1._1,x._2))
    	
    	    })
    	
    	
    	
    	    //连接join 之后的结果[(基站ID,((手机号,时间),(经度,纬度)))]
    	
    	    val joined:RDD[(String, ((String, Long), (String, String)))] = pmt.join(alcsplited)
    	
    	
    	    //按照手机号进行分组
    	    //_.        :代表的是基站 手机号,时间,经度,纬度
    	    //_._2      :代表的是 手机号,时间 经度,纬度
    	    //_._2_1    :代表的是 手机号,时间
    	    //_._2._1._ :代表的是 手机号
    	    val MobileGroupBykey  = joined.groupBy(_._2._1._1)
    	
    	    val result = MobileGroupBykey.mapValues(_.toList.sortBy(_._2._1._2).reverse.take(2))
    	
    	    println(result.collect().toBuffer)
    	
    	    sc.stop()
    	
    	
    	  }
    	
    	}
  • 相关阅读:
    网络配置
    mysql和mongodb的区别
    HTTP和HTTPS
    网络架构/结构
    SKU和SPU表的设计
    第三方-FastDFS分布式文件系统
    并发和并行
    多任务-线程、进程、协程的一些见解
    多任务-协程
    多任务-协程之生成器
  • 原文地址:https://www.cnblogs.com/suway/p/9921044.html
Copyright © 2011-2022 走看看