zoukankan      html  css  js  c++  java
  • spark练习---ip匹配以及广播的特性

      今天,我们还是在介绍spark的小练习,这次的小练习还是基于IP相关的操作,我们可以先看一下今天的需求,我们有两个文件,

      第一个文件,是IP的字典,也就是我们上一篇介绍过的,就是表明了所有IP字段所属的位置,以及最大值以及最小值(例如)

    1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
    1.0.8.0|1.0.15.255|16779264|16781311|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
    1.0.32.0|1.0.63.255|16785408|16793599|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
    1.1.0.0|1.1.0.255|16842752|16843007|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
    

      例如第一行的数据,
      1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302

      这个里面16777472以及16778239就是当我们把IP转换成Long类型的值之后,如果那个值在这个里面,我们就可以确定这个IP实在中国 的福建

      第二个文件,是一个日志,这个日志里面的内容大致是这个用户访问的时间,以及IP,以及浏览网址以及浏览器所带的一些信息(例如)

    20090121000132095572000|125.213.100.123|show.51.com|/shoplist.php?.....
    20090121000132124542000|117.101.215.133|www.jiayuan.com|/19245971|Mozilla/4.0 (compatible; MSIE 6.0;.......
    20090121000132406516000|117.101.222.68|gg.xiaonei.com|/view.jsp?p=389|Mozilla/4.0 (compatible; MSIE 7.0; .....
    20090121000132581311000|115.120.36.118|tj.tt98.com|/tj.htm|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TheWorld)|http://www.tt98.com/|
    

      则此时我们就可以开始写程序了,

    package cn.wj.spark.day04
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by WJ on 2017/1/4.
      */
    object IPLocation {
    
      //将IP转换成为Long类型
      def ip2Long(ip: String): Long = {
        val fragments = ip.split("[.]")
        var ipNum = 0L
        for (i <- 0 until fragments.length){
          ipNum =  fragments(i).toLong | ipNum << 8L
        }
        ipNum
      }
    
      //使用二分法,对IP进行查找,让ip与start_num以及end_num做对比
      def binarySearch(lines: Array[(String, String, String)], ip: Long) : Int = {
        var low = 0
        var high = lines.length - 1
        while (low <= high) {
          val middle = (low + high) / 2
          if ((ip >= lines(middle)._1.toLong) && (ip <= lines(middle)._2.toLong))
            return middle
          if (ip < lines(middle)._1.toLong)
            high = middle - 1
          else {
            low = middle + 1
          }
        }
        -1
      }
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName("IPLocation")
        val sc = new SparkContext(conf)
    
        val ipRulesRdd = sc.textFile("e://Test/ip.txt").map(lines =>{
          val fields = lines.split("\|")
          val start_num = fields(2)
          val end_num = fields(3)
          val province = fields(6)
          (start_num,end_num,province)
        })
        //全部的IP映射规则
        val ipRulesArrary = ipRulesRdd.collect()
    
        //广播规则,这个是由Driver向worker中广播规则
        val ipRulesBroadcast = sc.broadcast(ipRulesArrary)
    
        //加载要处理的数据
        val ipsRDD = sc.textFile("e://Test/access_log").map(line =>{
          val fields = line.split("\|")
          fields(1)
        })
    
        val result = ipsRDD.map(ip =>{
          val ipNum = ip2Long(ip)
          val index = binarySearch(ipRulesBroadcast.value,ipNum)
          val info = ipRulesBroadcast.value(index)
          info
        })
    
        println(result.collect().toBuffer)
        sc.stop()
      }
    }

    则以上的结果会显示为:

      

      但是对于大数据来说,我们可能更想知道的是关于IP的一个总的计算,那么这个就会很简单,

      

    val result = ipsRdd.map(ip =>{
          val ipNum = ip2Long(ip)
          val index = binarySearch(ipRulesBroadcast.value,ipNum)
          val info = ipRulesBroadcast.value(index)
          info
        }).map(t => {(t._3,1)}).reduceByKey(_+_)

      我们只需要在上面的代码中把最后的输出结果在进行一个reduceByKey即可,则效果显示为:

      

    对于这个里面,有一下几点想说

      1.特殊转义的字符串:xxx.split("\|")

      2.为什么我们有些时候在写spark的程序的时候,我们要写

        val conf = new sparkConf().setAppName("xxx").setMaster("local[2]")

        val sc = new SparkContext(conf)

       这个是由于,我们要在main里面要使用spark的算子进行计算,所以我们需要写,如果不需要使用算子,完全没必要写这个

      3.一般从RDD中变为Action,我们此时可以println(result.collect().toBuffer)
      4.如果是要返回一个元祖,我们可以加上一个括号,然后类似于(province,start_num,end_num)

      5.将IP的值转化为Long类型的数
      

     //将IP转换成为Long类型
      def ip2Long(ip: String): Long = {
        val fragments = ip.split("[.]")
        var ipNum = 0L
        for (i <- 0 until fragments.length){
          ipNum =  fragments(i).toLong | ipNum << 8L
        }
        ipNum
      }

      6.上述程序为什么会有广播的概念,因为当我Master接到一个任务的时候,他要把这个任务放到Worker的Excutor中执行,对于匹配的规则,是我们在main中得到,如果要把这个规则让每一个worker都可以得到,所以我们需要 Master把这些信息广播到Worker上

      

  • 相关阅读:
    c# base64及MD5工具类
    c# dateTime格式转换为Unix时间戳工具类
    c# 金钱大写转小写工具类
    c# bitmap的拷贝及一个图像工具类
    C# DataTable映射成Entity
    Kubernetes---Service(SVC)服务--ingress api
    Kubernetes---Service(SVC)服务
    为网站文字前面添加图标 在线调用 Font Awesome 字体icon小图标 美化网站
    [转]office 2016 4合1/3合1 专业版 增强版 精简绿色安装版
    [转]数据恢复 文件恢复工具 DiskGenius v4.9.1 绿色专业版及单文件
  • 原文地址:https://www.cnblogs.com/wnbahmbb/p/6250836.html
Copyright © 2011-2022 走看看