zoukankan      html  css  js  c++  java
  • geoip ip2region2 with spark

    上一篇文章中 我使用 maxmind的免费库开发了一个waterdrop的 插件,测试数据发现,国内的有些市级还是不准确,而且香港并不是显示中国,这就不友好了。

    找了一下,发下 ip2region 这个很不错。https://github.com/lionsoul2014/ip2region

    我使用这个库,然后使用之前的代码,稍加修改,测试了一下,效果还是不错的。基本没有再出现空的值。

    关于查询效率上,如作者所说,memsearch最快,我测试了确实如此,但是会出现一开始的一些spark流的批次效率稍差一些,慢慢地会提上去

    package com.student
    
    import io.github.interestinglab.waterdrop.apis.BaseFilter
    import com.typesafe.config.{Config, ConfigFactory}
    import org.apache.spark.SparkFiles
    import org.apache.spark.sql.{Dataset, Row, SparkSession}
    import org.apache.spark.sql.functions.{col, udf}
    
    import scala.collection.JavaConversions._
    import org.lionsoul.ip2region.DbConfig
    import org.lionsoul.ip2region.DbSearcher
    import scala.collection.JavaConversions._
    import org.lionsoul.ip2region.DbConfig
    import org.lionsoul.ip2region.DbSearcher
    
    
    object SearcherWrapper extends Serializable {
      @transient lazy val searcher = {
        val config = new DbConfig
        val dbfile = SparkFiles.get("ip2region.db")
        val searcher = new DbSearcher(config, dbfile)
        searcher
      }
    }
    
    
    
    class IP2Region2 extends BaseFilter {
    
      var config: Config = ConfigFactory.empty()
    
      /**
        * Set Config.
        **/
      override def setConfig(config: Config): Unit = {
        this.config = config
      }
    
      /**
        * Get Config.
        **/
      override def getConfig(): Config = {
        this.config
      }
    
      override def checkConfig(): (Boolean, String) = {
    
        val requiredOptions = List("source_field")
        val nonExistsOptions: List[(String, Boolean)] = requiredOptions.map { optionName =>
          (optionName, config.hasPath(optionName))
        }.filter { p =>
          !p._2
        }
    
        if (nonExistsOptions.length == 0) {
          (true, "")
        } else {
          (false, "please specify setting as non-empty string")
        }
    
      }
    
      override def prepare(spark: SparkSession): Unit = {
    
        val defaultConfig = ConfigFactory.parseMap(
          Map(
            "source_field" -> "raw_message",
            "target_field" -> "__ROOT__"
          )
        )
    
        config = config.withFallback(defaultConfig)
    
    
      }
    
      override def process(spark: SparkSession, df: Dataset[Row]): Dataset[Row] = {
    
        val srcField = config.getString("source_field")
    
        val ip2region=udf{ip:String => ip2Location2(ip)}
    
         import  org.apache.spark.sql.functions.split
    
         df.withColumn("__region__", ip2region(col(srcField)))
          .withColumn("__country__",split(col("__region__"),"\|")(0))
          .withColumn("__province__",split(col("__region__"),"\|")(2))
          .withColumn("__city__",split(col("__region__"),"\|")(3))
           .withColumn("__isp__",split(col("__region__"),"\|")(4))
    
      }
    
    
    
      def ip2Location2(ip: String) = {
        try {
          val searcher = SearcherWrapper.searcher
          val response = searcher.memorySearch(ip)
    
          response.getRegion
        }
        catch {
          case ex: Exception =>
           // ex.printStackTrace()
            ""
        }
      }
    
    }

  • 相关阅读:
    Codeforces Round #277 (Div. 2)
    Topcoder SRM 637 (Div.2)
    【转】大素数判断和素因子分解【miller-rabin和Pollard_rho算法】
    【网络流#5】UVA 11082 最大流
    【网络流#4】UVA 753 最大流
    Codeforces Round #274 (Div. 2)
    【网络流#3】hdu 1532
    【网络流#2】hdu 1533
    【网络流#1】hdu 3549
    Codeforces Round #273 (Div. 2)
  • 原文地址:https://www.cnblogs.com/huaxiaoyao/p/12104587.html
Copyright © 2011-2022 走看看