zoukankan      html  css  js  c++  java
  • 【Spark】如何用Spark查询IP地址?


    需求

    日常生活中,当我们打开地图时,会通过地图道路颜色获取当前交通情况,也可以通过地图上经常网购的IP地址热力图得出哪些地区网购观念更发达,还有当前疫情的情况,各个地区疫情的热力图可以直观反应出疫情的严重程度。
    想要获取热力图,首先要清楚,通过点击流日志中的IP地址信息,可以推算出所在城市甚至经度和维度,通过计算同一经度纬度出现的次数,就可以绘制出热力图


    思路

    现在拥有两张信息表:
    一张是IP字典,这里面可以对我们有直接实用价值的就是有序的转换为Long类型的IP范围,还有每一个IP范围所对应的经纬度

    1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
    1.0.8.0|1.0.15.255|16779264|16781311|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
    1.0.32.0|1.0.63.255|16785408|16793599|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
    1.1.0.0|1.1.0.255|16842752|16843007|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
    1.1.2.0|1.1.7.255|16843264|16844799|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
    1.1.8.0|1.1.63.255|16844800|16859135|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
    1.2.0.0|1.2.1.255|16908288|16908799|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
    1.2.2.0|1.2.2.255|16908800|16909055|亚洲|中国|北京|北京|海淀|北龙中网|110108|China|CN|116.29812|39.95931
    1.2.4.0|1.2.4.255|16909312|16909567|亚洲|中国|北京|北京||中国互联网信息中心|110100|China|CN|116.405285|39.904989
    1.2.5.0|1.2.7.255|16909568|16910335|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
    1.2.8.0|1.2.8.255|16910336|16910591|亚洲|中国|北京|北京||中国互联网信息中心|110100|China|CN|116.405285|39.904989
    1.2.9.0|1.2.127.255|16910592|16941055|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
    1.3.0.0|1.3.255.255|16973824|17039359|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
    1.4.1.0|1.4.3.255|17039616|17040383|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
    ……
    ……
    

    另一张是用户IP上网记录,虽然有很多很长,但是对于目前此项目,只需要提取其中的IP地址就可以了

    20090121000132095572000|125.213.100.123|show.51.com|/shoplist.php?phpfile=shoplist2.php&style=1&sex=137|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; Mozilla/4.0(Compatible Mozilla/4.0(Compatible-EmbeddedWB 14.59 http://bsalsa.com/ EmbeddedWB- 14.59  from: http://bsalsa.com/ )|http://show.51.com/main.php|
    20090121000132124542000|117.101.215.133|www.jiayuan.com|/19245971|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TencentTraveler 4.0)|http://photo.jiayuan.com/index.php?uidhash=d1c3b69e9b8355a5204474c749fb76ef|__tkist=0; myloc=50%7C5008; myage=2009; PROFILE=14469674%3A%E8%8B%A6%E6%B6%A9%E5%92%96%E5%95%A1%3Am%3Aphotos2.love21cn.com%2F45%2F1b%2F388111afac8195cc5d91ea286cdd%3A1%3A%3Ahttp%3A%2F%2Fimages.love21cn.com%2Fw4%2Fglobal%2Fi%2Fhykj_m.jpg; last_login_time=1232454068; SESSION_HASH=8176b100a84c9a095315f916d7fcbcf10021e3af; RAW_HASH=008a1bc48ff9ebafa3d5b4815edd04e9e7978050; COMMON_HASH=45388111afac8195cc5d91ea286cdd1b; pop_1232093956=1232468896968; pop_time=1232466715734; pop_1232245908=1232469069390; pop_1219903726=1232477601937; LOVESESSID=98b54794575bf547ea4b55e07efa2e9e; main_search:14469674=%7C%7C%7C00; registeruid=14469674; REG_URL_COOKIE=http%3A%2F%2Fphoto.jiayuan.com%2Fshowphoto.php%3Fuid_hash%3D0319bc5e33ba35755c30a9d88aaf46dc%26total%3D6%26p%3D5; click_count=0%2C3363619
    20090121000132406516000|117.101.222.68|gg.xiaonei.com|/view.jsp?p=389|Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; CIBA)|http://home.xiaonei.com/Home.do?id=229670724|_r01_=1; __utma=204579609.31669176.1231940225.1232462740.1232467011.145; __utmz=204579609.1231940225.1.1.utmccn=(direct)
    20090121000132581311000|115.120.36.118|tj.tt98.com|/tj.htm|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TheWorld)|http://www.tt98.com/|
    20090121000132864647000|123.197.64.247|cul.sohu.com|/20071227/n254338813_22.shtml|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TheWorld)|http://cul.sohu.com/20071227/n254338813_22.shtml|ArticleTab=visit:1; IPLOC=unknown; SUV=0901080709152121; vjuids=832dd37a1.11ebbc5d590.0.b20f858f14e918; club_chat_ircnick=JaabvxC4aaacQ; spanel=%7B%22u%22%3A%22%22%7D; vjlast=1232467312,1232467312,30
    20090121000133296729000|222.55.57.176|down.chinaz.com|/|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; iCafeMedia; TencentTraveler 4.0)||cnzz_a33219=0; vw33219=%3A18167791%3A; sin33219=http%3A//www.itxls.com/wz/wyfx/it.html; rtime=0; ltime=1232464387281; cnzz_eid=6264952-1232464379-http%3A//www.itxls.com/wz/wyfx/it.html
    20090121000133331104000|123.197.66.93|www.pkwutai.cn|/down/downLoad-id-45383.html|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 1.7)|http://www.baidu.com/s?tn=b1ank_pg&ie=gb2312&bs=%C3%C0%C6%BC%B7%FE%D7%B0%B9%DC%C0%ED%C8%ED%BC%FE&sr=&z=&cl=3&f=8&wd=%C6%C6%BD%E2%C3%C0%C6%BC%B7%FE%D7%B0%B9%DC%C0%ED%C8%ED%BC%FE&ct=0|
    

    核心思路就是将上网记录中的IP地址转换为Long类型,利用二分法和IP字典中的IP范围比较,找出对于的经度和维度,然后对经度纬度作计数,最后把拿到的数据放到mysql数据库中


    ip地址转换为Long类型的两种方法

    以下方法是从博主 @徐志/忘川风华录 的文章 IP地址转换成Long型数字算法和原理(全网最细!!)
    查到的,佩服的可以点击链接前往观摩

    ip地址转换数字地址的原理

    IP地址一般是一个32位的二进制数意思就是如果将IP地址转换成二进制表示应该有32为那么长,但是它通常被分割为4个“8位二进制数”(也就是4个字节每,每个代表的就是小于2的8 次方)。IP地址通常用“点分十进制”表示成(a.b.c.d)的形式,其中,a,b,c,d都是0~255之间的十进制整数。例:点分十进IP地址(100.4.5.6),实际上是32位二进制数(01100100.00000100.00000101.00000110)

    第一种方法

    val ipV4="125.213.100.123"
    val fragments = ip22.split("[.]")
    var Ip_Num=125*256*256*256+213*256*256+100*256+123
    println(Ip_Num)     //打印的结果2111136891
    

    第二种方法

    val ip22="125.213.100.123"
    val fragments = ip22.split("[.]")
    var Ip_Num = 0L
    for (i <- 0 until fragments.length){
      Ip_Num =  fragments(i).toLong | ipNum << 8L
    }
    println(Ip_Num)     //打印结果  2111136891
    

    步骤

    一、在mysql创建数据库表

    /*
    SQLyog Ultimate v8.32 
    MySQL - 5.6.22-log : Database - spark
    *********************************************************************
    */
    
    
    /*!40101 SET NAMES utf8 */;
    
    /*!40101 SET SQL_MODE=''*/;
    
    /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
    /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
    /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
    /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
    CREATE DATABASE /*!32312 IF NOT EXISTS*/`spark` /*!40100 DEFAULT CHARACTER SET utf8 */;
    
    USE `spark`;
    
    /*Table structure for table `iplocation` */
    
    DROP TABLE IF EXISTS `iplocation`;
    
    CREATE TABLE `iplocation` (
      `longitude` varchar(32) DEFAULT NULL,
      `latitude` varchar(32) DEFAULT NULL,
      `total_count` varchar(32) DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    /*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
    /*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
    /*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
    /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
    

    二、开发代码

    import java.sql.{Connection, DriverManager}
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.broadcast.Broadcast
    
    
    object IpLocation {
    
      //定义把IP地址转换为Long类型的方法 125.213.100.123
      def ipToLong(ip: String): Long = {
        val split: Array[String] = ip.split("\.")
        var resultNum: Long = 0L
        for (eachNum <- split) {
          resultNum = eachNum.toLong | resultNum << 8L
        }
        resultNum
      }
    
      /**
       * 使用二分查找法,确定用户ip落在了哪个范围
       *
       * @param userLongIP
       * @param ipArray
       * @return 返回ipArray的下标索引
       */
      def binarySearch(userLongIP: Long, ipArray: Array[(String, String, String, String)]): Int = {
        var startIndex: Int = 0
        var endIndex: Int = ipArray.length
        //创建while循环
        while (startIndex <= endIndex) {
          var middle: Int = {
            startIndex + endIndex
          } / 2
          //判断用户落在哪个范围
          if (userLongIP >= ipArray(middle)._1.toLong && userLongIP <= ipArray(middle)._2.toLong) {
            //使用return可以直接返回数据并停掉while循环
            return middle
          }
          //如果落在了middle的左边
          if (userLongIP < ipArray(middle)._1.toLong) {
            endIndex = middle - 1
          }
          //如果落在了middle的右边
          if (userLongIP > ipArray(middle)._1.toLong) {
            startIndex = middle + 1
          }
        }
        //避免方法报错,随便返回一个值即可,因为肯定不会到这个值,while循环最终都会落在return middle上
        0
      }
    
      /**
       * 定义函数,将数据放到mysql中去
       */
      //datas获取一个分区里的所有数据
      val data2Mysql = (datas: Iterator[((String, String), Int)]) => {
        //连接到mysql
        val connection: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", "root", "123456")
        //使用sql语句导入数据
        val preparedStatement = connection.prepareStatement("insert into iplocation(longitude,latitude,total_count) values(?,?,?)")
        //foreach能获取到datas中的每一条数据
        datas.foreach(record => {
          preparedStatement.setString(1, record._1._1) //获取经度
          preparedStatement.setString(2, record._1._2) //获取纬度
          preparedStatement.setInt(3, record._2) //获取次数
          preparedStatement.execute()
        })
        preparedStatement.close()
        connection.close()
      }
    
      def main(args: Array[String]): Unit = {
        //获取SparkConf
        val sparkConf = new SparkConf().setAppName("IP-Location").setMaster("local[2]").set("spark.driver.host", "localhost")
        //获取SparkContext
        val sparkContext = new SparkContext(sparkConf)
        //设置日志筛选条件
        sparkContext.setLogLevel("WARN")
    
        //读取IP字典
        val IpDictionary: RDD[String] = sparkContext.textFile("/Users/zhaozhuang/Desktop/4、Spark/Spark第二天/第二天教案/资料/服务器访问日志根据ip地址查找区域/用户ip上网记录以及ip字典/ip.txt")
        //IP字典内的数据需要的是IP字段Long类型的起始和结束,还有经度和维度
        //首先需要将拿到的数据进行分割  1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
        val map: RDD[Array[String]] = IpDictionary.map(x => x.split("\|"))
        //切割后,我们需要的数据就是ip范围起始值、结束值、经度和纬度 下标分别是2,3,length-2,length-1
        val ipRange: RDD[(String, String, String, String)] = map.map(x => (x(2), x(3), x(x.length - 2), x(x.length - 1)))
        //将IP字典的数据放到广播变量中,供所有的block块使用
        val collect: Array[(String, String, String, String)] = ipRange.collect()
        val broadcast: Broadcast[Array[(String, String, String, String)]] = sparkContext.broadcast(collect)
    
    
        //读取用户上网记录表
        val userIpRecord: RDD[String] = sparkContext.textFile("/Users/zhaozhuang/Desktop/4、Spark/Spark第二天/第二天教案/资料/服务器访问日志根据ip地址查找区域/用户ip上网记录以及ip字典/20090121000132.394251.http.format")
        //用户上网记录表中只需要IP地址的字段,所以进行切割拿到IP地址字段即可
        //数据样式 20090121000132095572000|125.213.100.123|show.51.com|/shoplist.php?phpfile=shoplist2.php&style=1&sex=137|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; Mozilla/4.0(Compatible Mozilla/4.0(Compatible-EmbeddedWB 14.59 http://bsalsa.com/ EmbeddedWB- 14.59  from: http://bsalsa.com/ )|http://show.51.com/main.php|
        val allUserIP: RDD[String] = userIpRecord.map(x => x.split("\|")(1))
    
        /**
         * mapPartitions需要两个参数
         * def mapPartitions[U: ClassTag](
         * f: Iterator[T] => Iterator[U],       第一个是给一个是迭代器的函数,返回一个迭代器
         * preservesPartitioning: Boolean = false)        第二个参数为默认值
         *
         * 本方法是将拿到的用户IP一次性提取一个分区的数据,iter就表示一个分区的所有数据
         */
        val numTimes: RDD[((String, String), Int)] = allUserIP.mapPartitions(iter => {
          //获取广播变量里的值   数据样式:16777472|16778239|119.306239|26.075302
          val ipArray: Array[(String, String, String, String)] = broadcast.value
          //通过map方法遍历一个分区中的所有数据
          iter.map(ip => {
            //获取每一个用户的IP,需要是Long类型
            val userLongIP: Long = ipToLong(ip)
            //将用户ip和广播变量中的数据进行比较,确定一下用户ip在哪个ip范围,返回值是ipArray索引
            val resultIndex: Int = binarySearch(userLongIP, ipArray)
            //每个经纬度出现一次就计作一次
            ((ipArray(resultIndex)._3, ipArray(resultIndex)._4), 1)
          })
        })
    
    
        //将每个经度和维度出现的次数进行累加
        val locationCount: RDD[((String, String), Int)] = numTimes.reduceByKey(_ + _)
    
        //将最终累加的结果值保存到Mysql中去
        locationCount.foreachPartition(data2Mysql)
    
      }
    }
    
    
  • 相关阅读:
    The builder launch configuration could not be found
    桌面上的图标不见了
    outlook软件后台运行
    c盘突然少了容量
    win7台式机睡眠时间修改
    系统占用的内存
    详细讲解 java 中的synchronized 转自 http://www.cnblogs.com/devinzhang/archive/2011/12/14/2287675.html
    The US ASCII Character Set 对应码 可以解决 URL中的特殊符号的传输问题
    oracle基本操作 转载
    内存中的 栈与堆
  • 原文地址:https://www.cnblogs.com/zzzsw0412/p/12772392.html
Copyright © 2011-2022 走看看