zoukankan      html  css  js  c++  java
  • maxmind geoip2使用笔记

    客户需求如下,nginx的访问日志中ip,匹配出对应的国家,省份和城市,然后给我了一个maxmind的连接参考。

    查找资料,有做成hive udf的使用方式, 我们项目中一直使用 waterdrop 来做数据处理,所以决定开发一个 waterdrop的插件。

    关于这个功能,waterdrop本身提供有两个商用组件,geopip2(也是使用maxmind) 另一个是国内的 ipipnet。

    如果有人不懂 waterdrop,可以参考 https://interestinglab.github.io/waterdrop/#/zh-cn/quick-start

    开发使用 scala语言,开发完毕后,使用 mvn clean package 打包即可,生成的包是不含有 依赖的,请注意把依赖放到spark classpath中去使用。

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.student</groupId>
        <artifactId>GeoIP2</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <scala.version>2.11.8</scala.version>
            <scala.binary.version>2.11</scala.binary.version>
            <spark.version>2.4.0</spark.version>
            <waterdrop.version>1.4.0</waterdrop.version>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
        </properties>
    
    
        <dependencies>
            <dependency>
                <groupId>io.github.interestinglab.waterdrop</groupId>
                <artifactId>waterdrop-apis_2.11</artifactId>
                <version>${waterdrop.version}</version>
            </dependency>
            <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.3.1</version>
        </dependency>
    
    
            <dependency>
                <groupId>com.maxmind.db</groupId>
                <artifactId>maxmind-db</artifactId>
                <version>1.1.0</version>
            </dependency>
            <dependency>
                <groupId>com.maxmind.geoip2</groupId>
                <artifactId>geoip2</artifactId>
                <version>2.6.0</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>2.0.2</version>
                    <configuration>
                        <source>${maven.compiler.source}</source>
                        <target>${maven.compiler.target}</target>
                    </configuration>
                </plugin>
    
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <executions>
                        <execution>
                            <id>scala-compile-first</id>
                            <phase>process-resources</phase>
                            <goals>
                                <goal>add-source</goal>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                        <execution>
                            <id>scala-test-compile</id>
                            <phase>process-test-resources</phase>
                            <goals>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
            </plugins>
        </build>
    </project>

    主要的程序文件只有一个Geoip2:

    package com.student
    
    
    import io.github.interestinglab.waterdrop.apis.BaseFilter
    import com.typesafe.config.{Config, ConfigFactory}
    import org.apache.spark.sql.{Dataset, Row, SparkSession}
    import org.apache.spark.sql.functions.{col, udf}
    
    import scala.collection.JavaConversions._
    import com.maxmind.geoip2.DatabaseReader
    import java.io.{File, InputStream}
    import java.net.InetAddress
    
    import com.maxmind.db.CHMCache
    import org.apache.spark.SparkFiles
    
    
    
    object ReaderWrapper extends Serializable {
      @transient lazy val reader = {
        val geoIPFile = "GeoLite2-City.mmdb";
        val database = new File(SparkFiles.get(geoIPFile));
        val reader: DatabaseReader = new DatabaseReader.Builder(database)
          //.fileMode(com.maxmind.db.Reader.FileMode.MEMORY)
          .fileMode(com.maxmind.db.Reader.FileMode.MEMORY_MAPPED)
          .withCache(new CHMCache()).build();
        reader
      }
    }
    
    class GeoIP2 extends BaseFilter {
    
      var config: Config = ConfigFactory.empty()
    
      /**
        * Set Config.
        **/
      override def setConfig(config: Config): Unit = {
        this.config = config
      }
    
      /**
        * Get Config.
        **/
      override def getConfig(): Config = {
        this.config
      }
    
      override def checkConfig(): (Boolean, String) = {
    
        val requiredOptions = List("source_field")
        val nonExistsOptions: List[(String, Boolean)] = requiredOptions.map { optionName =>
          (optionName, config.hasPath(optionName))
        }.filter { p =>
          !p._2
        }
    
        if (nonExistsOptions.length == 0) {
          (true, "")
        } else {
          (false, "please specify setting as non-empty string")
        }
    
      }
    
      override def prepare(spark: SparkSession): Unit = {
    
        val defaultConfig = ConfigFactory.parseMap(
          Map(
            "source_field" -> "raw_message",
            "target_field" -> "__ROOT__"
          )
        )
    
        config = config.withFallback(defaultConfig)
    
    
      }
    
      override def process(spark: SparkSession, df: Dataset[Row]): Dataset[Row] = {
    
        val srcField = config.getString("source_field")
        val func = udf { ip: String => ip2Locatation(ip) }
    
        val ip2Country=udf{ip:String => ip2Location2(ip,1)}
        val ip2Province=udf{ip:String => ip2Location2(ip,2)}
        val ip2City=udf{ip:String => ip2Location2(ip,3)}
    
        //df.withColumn(config.getString("target_field"), func(col(srcField)))
        df.withColumn("__country__", ip2Country(col(srcField)))
        .withColumn("__province__", ip2Province(col(srcField)))
        .withColumn("__city__", ip2City(col(srcField)))
    
    
      }
    
      def ip2Locatation(ip: String) = {
        try {
          val reader = ReaderWrapper.reader
          val ipAddress = InetAddress.getByName(ip)
          val response = reader.city(ipAddress)
          val country = response.getCountry()
          val subdivision = response.getMostSpecificSubdivision()
          val city = response.getCity()
          (country.getNames().get("zh-CN"), subdivision.getNames.get("zh-CN"), city.getNames().get("zh-CN"))
        }
        catch {
          case ex: Exception =>
            ex.printStackTrace()
            ("", "", "")
        }
      }
    
      def ip2Location2(ip: String,index: Int) = {
        try {
          val reader = ReaderWrapper.reader
          val ipAddress = InetAddress.getByName(ip)
          val response = reader.city(ipAddress)
    
          index match {
            case 1 => response.getCountry().getNames().get("zh-CN")
            case 2 => response.getMostSpecificSubdivision().getNames.get("zh-CN")
            case 3 => response.getCity().getNames().get("zh-CN")
            case _ => ""
          }
        }
        catch {
          case ex: Exception =>
            ex.printStackTrace()
            ""
        }
      }
    
    }

    测试类的代码如下:

    package com.student
    
    import com.typesafe.config._
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    
    object TestIt {
    
      def main(args: Array[String]): Unit = {
    
    
        val spark: SparkSession = SparkSession.builder().appName("demo").master("local[1]")
          .config("spark.files","/Users/student.yao/code/sparkshell/GeoLite2-City.mmdb")
          //.enableHiveSupport()
          .getOrCreate()
    
        spark.sparkContext.setLogLevel("WARN")
    
    
    
        //获取到第一个conf,复制给插件的实例
        val firstConf:Config = ConfigFactory.empty()
    
        //实例化插件对象
        val pluginInstance: GeoIP2 = new GeoIP2
        pluginInstance.setConfig(firstConf)
        pluginInstance.prepare(spark)
        //虚拟一些数据
    
        import spark.implicits._
        val sourceFile: DataFrame = Seq((1, "221.131.74.138"),
          (2, "112.25.215.84"),
          (3,"103.231.164.15"),
          (4,"36.99.136.137"),
          (5,"223.107.54.102"),
          (6,"117.136.118.125")
        ).toDF("id", "raw_message")
    
        sourceFile.show(false)
        val df2 = pluginInstance.process(spark,sourceFile)
        df2.show(false)
      }
    }
    View Code

    遇到的问题 

    1.一开始的时候,把 GeoLite2-city.mmdb放到了 Resources文件夹,想把它打到jar包里,然后在项目中 使用 getClass().getResourceAsStream("/GeoLite2-city")

    运行中发现这种方式特别慢,说这种慢,是和 使用 sparkFieles.get方式比较起来,遂改成sparkFiles获取的方式,先把文件放到 hdfs,然后在spark作业配置荐中配置:

    spark.files="hdfs://nameservicesxxx/path/to/Geolite2-city.mmdb",这样 SparkFiles.get("GeoLite2-city.mmdb") 就可以获取文件使用。

    2。性能优化的过程中,想把 Reader提出来放在 Prepare方法里面是很自然的一个想法,在本机测试的时侯没问题,因为是单机的,没发现,在生产上时发现报 不可序列化的异常。

    其实在 ideaj中可以使用 spark.sparkContext.broadcast(reader)的方式,就可以发现这个异常。如何解决这个异常,通常的解决方式是 在 dataframe|rdd的 foreachpartition|mappartitions中

    生成对象,这样就不会报错了。进一步可以使用 单例 这样效果更好些 。但这个是 插件,没法这样做, 就想到了可以使用一个外壳包起来,让reader不序列化即可。

    所以有了 ReaderWrapper extends Serializable @transient lazy val 这一段。

    3。初始化 reader的时候进行缓存

    经过这些处理,性能得到了提升,经测试,4G,4core,每5秒一个批次,2万条数据处理3秒钟。(kafka->waterdrop-json->es)全过程。

  • 相关阅读:
    spring源码阅读之ioc
    java基础面试题
    【0708】(OOP)编写并输出学员类和教员类
    【0706】综合作业:吃货联盟订餐系统
    【0703作业】输入一批整数,输出其中的最大值和最小值,输入0结束循环
    【0703作业】获取最低价手机价格
    【0703作业】一组成绩降序排列,插入数值
    【0703作业】猜数游戏
    【0702作业】根据数字输出行数(1-9)
    【0702作业】输出1-7对应星期
  • 原文地址:https://www.cnblogs.com/huaxiaoyao/p/12088810.html
Copyright © 2011-2022 走看看