zoukankan      html  css  js  c++  java
  • spark streaming 使用geoIP解析IP

    1、首先将GEOIP放到服务器上,如,/opt/db/geo/GeoLite2-City.mmdb

    2、新建scala sbt工程,测试是否可以顺利解析

    import java.io.File
    import java.net.InetAddress
    import com.maxmind.db.CHMCache
    import com.maxmind.geoip2.DatabaseReader
    import org.json4s.DefaultFormats

    /**
    * Created by zxh on 2016/7/17.
    */
    object test {
    implicit val formats = DefaultFormats

    def main(args: Array[String]): Unit = {
    val url = "F:\Code\OpenSource\Data\spark-sbt\src\main\resources\GeoLite2-City.mmdb"
    // val url2 = "/opt/db/geo/GeoLite2-City.mmdb"
    val geoDB = new File(url);
    geoDB.exists()
    val geoIPResolver = new DatabaseReader.Builder(geoDB).withCache(new CHMCache()).build();
    val ip = "222.173.17.203"
    val inetAddress = InetAddress.getByName(ip)
    val geoResponse = geoIPResolver.city(inetAddress)
    val (country, province, city) = (geoResponse.getCountry.getNames.get("zh-CN"), geoResponse.getSubdivisions.get(0).getNames().get("zh-CN"), geoResponse.getCity.getNames.get("zh-CN"))

    println(s"country:$country,province:$province,city:$city")
    }
    }
    build.sbt 内容如下
    import AssemblyKeys._ assemblySettings mergeStrategy in assembly <<= (mergeStrategy in assembly) { mergeStrategy => { case entry => { val strategy = mergeStrategy(entry) if (strategy == MergeStrategy.deduplicate) MergeStrategy.first else strategy } } } assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) name := "scala_sbt" version := "1.0" scalaVersion := "2.10.4" libraryDependencies += "com.maxmind.geoip2" % "geoip2" % "2.5.0"

      将该程序打包,放到服务器上,执行scala -cp ./scala_sbt-assembly-1.0.jar test,解析结果如下

    country:中国,province:山东省,city:济南

    3、编写streaming程序

    import java.io.File
    import java.net.InetAddress

    import com.maxmind.db.CHMCache
    import com.maxmind.geoip2.DatabaseReader
    import com.maxmind.geoip2.model.CityResponse
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
    import org.apache.spark.{SparkContext, SparkConf}

    /**
    * Created by zxh on 2016/7/17.
    */
    object geoip {

    def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("geoip_test").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))
    val lines = ssc.socketTextStream("localhost", 9999)

    lines.foreachRDD((rdd: RDD[String], t: Time) => {
    rdd.foreachPartition(p => {
    val url2 = "/opt/db/geo/GeoLite2-City.mmdb"
    val geoDB = new File(url2);
    val geoIPResolver = new DatabaseReader.Builder(geoDB).withCache(new CHMCache()).build();

    def resolve_ip(resp: CityResponse): (String, String, String) = {
    (resp.getCountry.getNames.get("zh-CN"), resp.getSubdivisions.get(0).getNames().get("zh-CN"), resp.getCity.getNames.get("zh-CN"))
    }

    p.foreach(x => {
    if (x != None && x != null && x != "") {
    val inetAddress = InetAddress.getByName(x)
    val geoResponse = geoIPResolver.city(inetAddress)
    println(resolve_ip(geoResponse))
    }
    })
    })
    })

    ssc.start
    }
    }
    build.sbt

    libraryDependencies += "com.maxmind.geoip2" % "geoip2" % "2.5.0"

    注意:红色部分需要放到foreachPartition内部,原因如下:

    1、减少加载文件次数,一个Partition只加载一次

    2、resolve_ip 函数参数为CityResponse,此参数不可序列化,所以要在Partition内部,这样就不会在节点之间序列化传输

    3、com.maxmind.geoip2 版本需要是 2.5.0,以便和spark本身兼容,否则会报错如下:

    val geoIPResolver = new DatabaseReader.Builder(geoDB).withCache(new CHMCache()).build();
    java.lang.NoSuchMethodError: com.fasterxml.jackson.databind.node.ArrayNode.<init>(Lcom/fasterxml/jackson/databind/node/JsonNodeFactory;Ljava/util/List;)V

  • 相关阅读:
    CentOS6.5 mini安装到VirtualBox虚拟机中
    docker配置redis6.0.5集群
    docker搭建数据库高可用方案PXC
    我通过调试ConcurrentLinkedQueue发现一个IDEA的小虫子(bug), vscode复现, eclipse毫无问题
    ThreadLocal底层原理学习
    第九章
    多线程-java并发编程实战笔记
    Spring-IOC源码解读3-依赖注入
    Spring-IOC源码解读2.3-BeanDefinition的注册
    Spring-IOC源码解读2.2-BeanDefinition的载入和解析过程
  • 原文地址:https://www.cnblogs.com/piaolingzxh/p/5678910.html
Copyright © 2011-2022 走看看