zoukankan      html  css  js  c++  java
  • 使用Hive UDF和GeoIP库为Hive加入IP识别功能

    Hive是基于Hadoop的数据管理系统,作为分析人员的即时分析工具和ETL等工作的执行引擎,对于如今的大数据管理与分析、处理有着非常大的 意义。GeoIP是一套IP映射数据库,它定时更新,并且提供了各种语言的API,非常适合在做地域相关数据分析时的一个数据源。

    Precondition:通过 IP 地址获得用户的地理位置信息
    也就是根据用户的IP,通过IP数据库查询获得信息。一般IP数据库中,

    每条记录的基本结构

    IP地址段(起始、结束),以及对应的信息数据
    一般包含的信息:国家、区域(省/州)、城市、街道、经纬度、ISP提供商等信息

    因为IP数据库随着时间经常变化(不过一段时间内变化很小),所以需要有人经常维护和更新。这个数据也不可能完全准确、也不可能覆盖全。这是maxmind的城市准确度 http://www.maxmind.com/app/city_accuracy
    因为没有权威的数据组织机构,且经常有变化。各家数据供应商,基本上做着做着就形成自己的一套数据了。

    目前,国内用的比较有名的是“纯真IP数据库”,国外常用的是 maxmind、ip2location。

    IP数据库是否收费:收费、免费都有。一般有人维护的数据往往都是收费的,准确率和覆盖率会稍微高一些。

    质量方面:
    1. 主要概念是准确率和覆盖率。
    2. 记录数据总条数。纯真现在是38万条(2010年07月30日更新)
    3. 是否有人维护。
    4. 数据库更新频率:每月、每周。数据库会定期更新的,maxmind开源版是每月更新一次。

    查询形式:
    • 本地,将IP数据库下载到本地使用,查询效率高、性能好。常用在统计分析方面。具体形式又分为:
      • 内存查询:将全部数据直接加载到内存中,便于高性能查询。或者二进制的数据文件本身就是经过优化的索引文件,可以直接对文件做查询。
      • 数据库查询:将数据导入到数据库,再用数据库查询。效率没有内存查询快。
    • 远程(web service或ajax),调用远程第三方服务。查询效率自然比较低,一般用在网页应用中。
    查询的本质:输入一个IP,找到其所在的IP段,一般都是采用二分搜索实现的。


    是否提供API:有的IP数据库提供API,支持多语言(java、javascript、C#等),这样你就不用自己直接分析数据格式、整理、写查询代码了。

    是否提供经纬度:纯真IP数据库不提供经纬度,Maxmind提供,如果做地图应用,一般是需要经纬度的


    而UDF是Hive提供的用户自定义函数的接口,通过实现它可以扩展Hive目前已有的内置函数。而为Hive加入一个IP映射函数,我们只需要简单地在UDF中调用GeoIP的Java API即可。

    GeoIP的数据文件可以从这里下载:http://www.maxmind.com/download/geoip/database/,由于需 要国家和城市的信息,我这里下载的是http://www.maxmind.com/download/geoip/database /GeoLiteCity.dat.gz

    GeoIP的各种语言的API可以从这里下载:http://www.maxmind.com/download/geoip/api/

    操作Steps如下:

    Step 1:Hive所需添加的IP地址信息识别UDF函数如下:

    package org.hadoop.hive.additionalUDF;
    
    import java.io.File;
    import java.io.IOException;
    import org.apache.hadoop.hive.ql.exec.UDF;
    
    import com.maxmind.geoip.Location;
    import com.maxmind.geoip.LookupService;
    import com.maxmind.geoip.regionName;
    import com.maxmind.geoip.timeZone;
    
    import java.util.regex.*;
    
    public class IPToCC  extends UDF {
        private static LookupService cl = null;
        private static String ipPattern = "\d+\.\d+\.\d+\.\d+";
        private static String ipNumPattern = "\d+";
        
        static LookupService getLS(String dbfile) throws IOException{
            
            //String sep = System.getProperty("file.separator");
            //String dir = "/home/landen/UntarFile/GeoIP";
    
            //String dbfile = dir + sep + "GeoLiteCity.dat";
            //String dbfile = "GeoLiteCity.dat";
            if(new File(dbfile).exists())
            {
                if(cl == null)
                {
                    cl = new LookupService(dbfile,LookupService.GEOIP_MEMORY_CACHE);
                }    
            }
            
            return cl;
    
        }
        
        /**
         * @param str like "114.43.181.143"
         * */
        
        public String evaluate(String str,String ipDBInfo) {
            try
            {
                Location l1 = null;
                Matcher mIP = Pattern.compile(ipPattern).matcher(str);
                Matcher mIPNum = Pattern.compile(ipNumPattern).matcher(str);
                if(mIP.matches())
                    l1 = getLS(ipDBInfo).getLocation(str);
                else if(mIPNum.matches())
                    l1 = getLS(ipDBInfo).getLocation(Long.parseLong(str));    
                
                /*System.out.println("countryCode: " + l1.countryCode +
                        "
     countryName: " + l1.countryName +
                        "
     region: " + l1.region +
                        "
     regionName: " + regionName.regionNameByCode(l1.countryCode, l1.region) +
                        "
     city: " + l1.city +
                        "
     latitude: " + l1.latitude +
                        "
     longitude: " + l1.longitude +
                        "
     timezone: " + timeZone.timeZoneByCountryAndRegion(l1.countryCode, l1.region));*/
                
                return String.format("%s	%s	%s	%s	%s	%s	%s	%s",l1.countryCode,l1.countryName,l1.region,regionName.regionNameByCode(l1.countryCode, l1.region),l1.city,l1.latitude,l1.longitude,timeZone.timeZoneByCountryAndRegion(l1.countryCode, l1.region));
            }
            catch(Exception e)
            {
                e.printStackTrace();
                if(cl != null)
                    cl.close();
                return null;
            }
        }
        
        public static void main(String[] args)
        {
            String dbfile = "GeoLiteCity.dat";
            IPToCC ipTocc = new IPToCC();
            String ipAdress = "221.12.10.218";
            
            System.out.println(ipTocc.evaluate(ipAdress,dbfile));
        }
    
    }
    Step 2.将以上程序和GeoIP的API程序,一起打成JAR包IPToCC.jar,和数据文件(GeoLiteCity.dat)一起放到Hive所在的服务器的一个位置。然后可以按照以下两种方式将以上资源添加到Hive中:
    1> 打开Hive执行以下语句:
    landen@Master:~/UntarFile/hive-0.10.0$ bin/hive
    WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files.
    Logging initialized using configuration in jar:file:/home/landen/UntarFile/hive-0.10.0/lib/hive-common-0.10.0.jar!/hive-log4j.properties
    Hive history file=/home/landen/UntarFile/hive-0.10.0/logs/hive_job_log_landen_201312081638_1930432077.txt
    hive (default)> use stuchoosecourse;
    OK
    Time taken: 5.251 seconds
    hive (stuchoosecourse)> add file /home/landen/UntarFile/GeoIP/GeoLiteCity.dat;
    Added resource: /home/landen/UntarFile/GeoIP/GeoLiteCity.dat
    hive (stuchoosecourse)> add jar /home/landen/UntarFile/hive-0.10.0/lib/IPTocc.jar;
    Added /home/landen/UntarFile/hive-0.10.0/lib/IPTocc.jar to class path
    Added resource: /home/landen/UntarFile/hive-0.10.0/lib/IPTocc.jar
    hive (stuchoosecourse)> create temporary function IP4Tocc as 'org.hadoop.hive.additionalUDF.IPToCC';
    OK
    Time taken: 0.107 seconds
    2> 在启动hive shell命令前,在$HIVE_HOME/conf目录下添加.hiverc文件,然后添加如下内容:
    add file /home/landen/UntarFile/GeoIP/GeoLiteCity.dat;
    add jar /home/landen/UntarFile/hive-0.10.0/lib/IPTocc.jar;
    create temporary function IP4Tocc as 'org.hadoop.hive.additionalUDF.IPToCC';
    当启动hive shell命令后,hive会将加载.hiverc文件内容并添加到全局内容中,便于client使用

    Step 3:Hive测试内容如下:
    hive (stuchoosecourse)> select * from ipidentifier;
    OK
    ipadress
    221.12.10.218
    60.180.248.201
    125.111.251.118
    Time taken: 0.099 seconds
    hive (stuchoosecourse)> select IP4Tocc(ipadress,'./GeoLiteCity.dat') from ipidentifier;
    Total MapReduce jobs = 1
    Launching Job 1 out of 1
    Number of reduce tasks is set to 0 since there's no reduce operator
    Starting Job = job_201312042044_0020, Tracking URL = http://Master:50030/jobdetails.jsp?jobid=job_201312042044_0020
    Kill Command = /home/landen/UntarFile/hadoop-1.0.4/libexec/../bin/hadoop job  -kill job_201312042044_0020
    Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
    2013-12-08 20:54:10,276 Stage-1 map = 0%,  reduce = 0%
    2013-12-08 20:54:18,308 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.55 sec
    2013-12-08 20:54:19,313 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.55 sec
    2013-12-08 20:54:20,317 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.55 sec
    2013-12-08 20:54:21,322 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.55 sec
    2013-12-08 20:54:22,326 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.55 sec
    2013-12-08 20:54:23,331 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.55 sec
    2013-12-08 20:54:24,402 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 2.55 sec
    MapReduce Total cumulative CPU time: 2 seconds 550 msec
    Ended Job = job_201312042044_0020
    MapReduce Jobs Launched:
    Job 0: Map: 1   Cumulative CPU: 2.55 sec   HDFS Read: 306 HDFS Write: 188 SUCCESS
    Total MapReduce CPU Time Spent: 2 seconds 550 msec
    OK
    _c0
    CN    China    02    Zhejiang    Hangzhou    30.293594    120.16141    Asia/Shanghai
    CN    China    02    Zhejiang    Wenzhou    27.999405    120.66681    Asia/Shanghai
    CN    China    02    Zhejiang    Ningbo    29.878204    121.5495    Asia/Shanghai
    hive (stuchoosecourse)> select split(IP4Tocc(ipadress,'./GeoLiteCity.dat'),' ') from ipidentifier;
    Total MapReduce jobs = 1
    Launching Job 1 out of 1
    Number of reduce tasks is set to 0 since there's no reduce operator
    Starting Job = job_201312042044_0021, Tracking URL = http://Master:50030/jobdetails.jsp?jobid=job_201312042044_0021
    Kill Command = /home/landen/UntarFile/hadoop-1.0.4/libexec/../bin/hadoop job  -kill job_201312042044_0021
    Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
    2013-12-08 21:12:46,717 Stage-1 map = 0%,  reduce = 0%
    2013-12-08 21:12:56,764 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 4.28 sec
    2013-12-08 21:12:57,768 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 4.28 sec
    2013-12-08 21:12:58,772 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 4.28 sec
    2013-12-08 21:12:59,775 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 4.28 sec
    2013-12-08 21:13:00,778 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 4.28 sec
    2013-12-08 21:13:01,782 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 4.28 sec
    2013-12-08 21:13:02,786 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 4.28 sec
    MapReduce Total cumulative CPU time: 4 seconds 280 msec
    Ended Job = job_201312042044_0021
    MapReduce Jobs Launched:
    Job 0: Map: 1   Cumulative CPU: 4.28 sec   HDFS Read: 306 HDFS Write: 188 SUCCESS
    Total MapReduce CPU Time Spent: 4 seconds 280 msec
    OK
    _c0
    ["CN","China","02","Zhejiang","Hangzhou","30.293594","120.16141","Asia/Shanghai"]
    ["CN","China","02","Zhejiang","Wenzhou","27.999405","120.66681","Asia/Shanghai"]
    ["CN","China","02","Zhejiang","Ningbo","29.878204","121.5495","Asia/Shanghai"]
    Time taken: 45.037 seconds
    hive (stuchoosecourse)> create table HiddenIPInfo(
                          > IP string,countrycode string,countryname string,region string,regionname string,city string,      
                          > latitude string,longitude string,timezone string);
    OK
    Time taken: 1.828 seconds
    hive (stuchoosecourse)> show tables;
    OK
    tab_name
    hbase_stu_course
    hiddenipinfo
    ipidentifier
    Time taken: 0.486 seconds
    hive (stuchoosecourse)> describe hiddenipinfo;
    OK
    col_name    data_type    comment
    ip    string    
    countrycode    string    
    countryname    string    
    region    string    
    regionname    string    
    city    string    
    latitude    string    
    longitude    string    
    timezone    string    
    Time taken: 0.33 seconds
    hive (stuchoosecourse)> from(select ipadress,split(IP4Tocc(ipadress,'./GeoLiteCity.dat'),' ') as IPInfo from ipidentifier)e
                          > insert overwrite table hiddenipinfo
                          > select e.ipadress,e.IPInfo[0] as countrycode,e.IPInfo[1] as countryname,e.IPInfo[2] as region,
                          > e.IPInfo[3] as regionname,e.IPInfo[4] as city,e.IPInfo[5] as latitude,e.IPInfo[6] as longitude,
                          > e.IPInfo[7] as timezone;
    Total MapReduce jobs = 3
    Launching Job 1 out of 3
    Number of reduce tasks is set to 0 since there's no reduce operator
    Starting Job = job_201312042044_0023, Tracking URL = http://Master:50030/jobdetails.jsp?jobid=job_201312042044_0023
    Kill Command = /home/landen/UntarFile/hadoop-1.0.4/libexec/../bin/hadoop job  -kill job_201312042044_0023
    Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
    2013-12-08 21:58:12,406 Stage-1 map = 0%,  reduce = 0%
    2013-12-08 21:58:18,449 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.48 sec
    2013-12-08 21:58:19,454 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.48 sec
    2013-12-08 21:58:20,458 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.48 sec
    2013-12-08 21:58:21,462 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.48 sec
    2013-12-08 21:58:22,466 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.48 sec
    2013-12-08 21:58:23,470 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.48 sec
    2013-12-08 21:58:24,474 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 1.48 sec
    MapReduce Total cumulative CPU time: 1 seconds 480 msec
    Ended Job = job_201312042044_0023
    Ended Job = 39195028, job is filtered out (removed at runtime).
    Ended Job = 1695434910, job is filtered out (removed at runtime).
    Moving data to: hdfs://Master:9000/home/landen/UntarFile/hive-0.10.0/warehouse/hive_2013-12-08_21-57-40_106_7083774091282915969/-ext-10000
    Loading data to table stuchoosecourse.hiddenipinfo
    Deleted hdfs://Master:9000/home/landen/UntarFile/hive-0.10.0/warehouse/stuchoosecourse.db/hiddenipinfo
    Table stuchoosecourse.hiddenipinfo stats: [num_partitions: 0, num_files: 1, num_rows: 0, total_size: 233, raw_data_size: 0]
    3 Rows loaded to hiddenipinfo
    MapReduce Jobs Launched:
    Job 0: Map: 1   Cumulative CPU: 1.48 sec   HDFS Read: 306 HDFS Write: 233 SUCCESS
    Total MapReduce CPU Time Spent: 1 seconds 480 msec
    OK
    ipadress    countrycode    countryname    region    regionname    city    latitude    longitude    timezone
    Time taken: 45.692 seconds
    hive (stuchoosecourse)> show tables;
    OK
    tab_name
    hbase_stu_course
    hiddenipinfo
    ipidentifier
    Time taken: 0.053 seconds
    hive (stuchoosecourse)> select * from hiddenipinfo;
    OK
    ip               countrycode    countryname    region    regionname    city       latitude    longitude    timezone
    221.12.10.218    CN             China          02        Zhejiang     Hangzhou   30.293594   120.16141    Asia/Shanghai
    60.180.248.201   CN             China          02        Zhejiang      Wenzhou    27.999405   120.66681    Asia/Shanghai
    125.111.251.118  CN             China          02        Zhejiang      Ningbo     29.878204   121.5495     Asia/Shanghai
    Time taken: 0.083 seconds






  • 相关阅读:
    springcloud费话之配置中心server修改
    springboot的jar包部署
    Address already in use : connect
    关于账户登录鉴权系统的要点
    springcloud费话之配置中心客户端(SVN)
    springcloud费话之配置中心基础(SVN)
    springcloud费话之断路器(hystrix in feign)
    springcloud费话之Eureka接口调用(feign)
    springcloud费话之Eureka服务访问(restTemplate)
    springcloud费话之Eureka集群
  • 原文地址:https://www.cnblogs.com/likai198981/p/3465365.html
Copyright © 2011-2022 走看看