zoukankan      html  css  js  c++  java
  • pig 自定义udf中读取hdfs 文件

    最近几天,在研究怎么样把日志中的IP地址转化成具体省份城市。

    希望写一个pig udf

    IP数据库采用的纯真IP数据库文件qqwry.dat,可以从http://www.cz88.net/下载。

    这里关键点在于怎么样读取这个文件,浪费了二天时间,现在把代码记录下来供和我遇到相同问题的朋友参考。

    pig script

    register /usr/local/pig/mypigudf.jar;
    define ip2address my.pig.func.IP2Address('/user/anny/qqwry.dat');
    
    a = load '/user/anny/hdfs/logtestdata/ipdata.log' as (ip:chararray);
    b = foreach a generate ip,ip2address(ip) as cc:map[chararray];
    c = foreach b generate ip,cc#'province' as province,cc#'city' as city,cc#'region' as region;
    dump c;

    java写的pig udf:

    package my.pig.func;
    
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.RandomAccessFile;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import my.pig.func.IPConvertCity.IPSeeker;
    import my.pig.func.IPConvertCity.IPUtil;
    import my.pig.func.IPConvertCity.LogFactory;
    
    import org.apache.log4j.Level;
    import org.apache.pig.EvalFunc;
    import org.apache.pig.data.DataType;
    import org.apache.pig.data.Tuple;
    import org.apache.pig.impl.logicalLayer.schema.Schema;
    
    public class IP2Address extends EvalFunc<Map<String, Object>> {
        private String lookupFile = "";
        private RandomAccessFile objFile = null;
    
        public IP2Address(String file) {
            this.lookupFile = file;
        }
    
        @Override
        public Map<String, Object> exec(Tuple input) throws IOException {
            if (input == null || input.size() == 0 || input.get(0) == null)
                return null;
            Map<String, Object> output = new HashMap<String, Object>();
            String str = (String) input.get(0);
            try {
                if (str.length() == 0)
                    return output;
    
                if (objFile == null) {
                    try {
                        objFile = new RandomAccessFile("./qqwry.dat", "r");
                    } catch (FileNotFoundException e1) {
                        System.out.println("IP地址信息文件没有找到" + lookupFile);
                        return null;
                    }
                }
                IPSeeker seeker = new IPSeeker(objFile);
                String country = seeker.getCountry(str);
                output = IPUtil.splitCountry(country);
    
                return output;
            } catch (Exception e) {
                return output;
            }
        }
    
        @Override
        public Schema outputSchema(Schema input) {
            return new Schema(new Schema.FieldSchema(null, DataType.MAP));
        }
    
        public List<String> getCacheFiles() {
            List<String> list = new ArrayList<String>(1);
            list.add(lookupFile + "#qqwry.dat");
            return list;
        }
    }

    Search for "Distributed Cache" in this page of the Pig docs: http://pig.apache.org/docs/r0.11.0/udf.html

    The example it shows using the getCacheFiles() method should ensure that the file is accessible to all the nodes in the cluster.


    参考文章:http://stackoverflow.com/questions/17514022/access-hdfs-file-from-udf

    http://stackoverflow.com/questions/19149839/pig-udf-maxmind-geoip-database-data-file-loading-issue

  • 相关阅读:
    oracle 10g 免安装客户端在windows下配置
    sql2005 sa密码
    使用windows live writer 有感
    windows xp SNMP安装包提取
    汉化groove2007
    迁移SQL server 2005 Reporting Services到SQL server 2008 Reporting Services全程截图操作指南
    foxmail 6在使用中的问题
    AGPM客户端连接不上服务器解决一例
    SpringSource Tool Suite add CloudFoundry service
    Java 之 SWing
  • 原文地址:https://www.cnblogs.com/anny-1980/p/3673419.html
Copyright © 2011-2022 走看看