zoukankan      html  css  js  c++  java
  • pig 自定义udf中读取hdfs 文件

    最近几天,在研究怎么样把日志中的IP地址转化成具体省份城市。

    希望写一个pig udf

    IP数据库采用的纯真IP数据库文件qqwry.dat,可以从http://www.cz88.net/下载。

    这里关键点在于怎么样读取这个文件,浪费了二天时间,现在把代码记录下来供和我遇到相同问题的朋友参考。

    pig script

    register /usr/local/pig/mypigudf.jar;
    define ip2address my.pig.func.IP2Address('/user/anny/qqwry.dat');
    
    a = load '/user/anny/hdfs/logtestdata/ipdata.log' as (ip:chararray);
    b = foreach a generate ip,ip2address(ip) as cc:map[chararray];
    c = foreach b generate ip,cc#'province' as province,cc#'city' as city,cc#'region' as region;
    dump c;

    java写的pig udf:

    package my.pig.func;
    
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.RandomAccessFile;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import my.pig.func.IPConvertCity.IPSeeker;
    import my.pig.func.IPConvertCity.IPUtil;
    import my.pig.func.IPConvertCity.LogFactory;
    
    import org.apache.log4j.Level;
    import org.apache.pig.EvalFunc;
    import org.apache.pig.data.DataType;
    import org.apache.pig.data.Tuple;
    import org.apache.pig.impl.logicalLayer.schema.Schema;
    
    public class IP2Address extends EvalFunc<Map<String, Object>> {
        private String lookupFile = "";
        private RandomAccessFile objFile = null;
    
        public IP2Address(String file) {
            this.lookupFile = file;
        }
    
        @Override
        public Map<String, Object> exec(Tuple input) throws IOException {
            if (input == null || input.size() == 0 || input.get(0) == null)
                return null;
            Map<String, Object> output = new HashMap<String, Object>();
            String str = (String) input.get(0);
            try {
                if (str.length() == 0)
                    return output;
    
                if (objFile == null) {
                    try {
                        objFile = new RandomAccessFile("./qqwry.dat", "r");
                    } catch (FileNotFoundException e1) {
                        System.out.println("IP地址信息文件没有找到" + lookupFile);
                        return null;
                    }
                }
                IPSeeker seeker = new IPSeeker(objFile);
                String country = seeker.getCountry(str);
                output = IPUtil.splitCountry(country);
    
                return output;
            } catch (Exception e) {
                return output;
            }
        }
    
        @Override
        public Schema outputSchema(Schema input) {
            return new Schema(new Schema.FieldSchema(null, DataType.MAP));
        }
    
        public List<String> getCacheFiles() {
            List<String> list = new ArrayList<String>(1);
            list.add(lookupFile + "#qqwry.dat");
            return list;
        }
    }

    Search for "Distributed Cache" in this page of the Pig docs: http://pig.apache.org/docs/r0.11.0/udf.html

    The example it shows using the getCacheFiles() method should ensure that the file is accessible to all the nodes in the cluster.


    参考文章:http://stackoverflow.com/questions/17514022/access-hdfs-file-from-udf

    http://stackoverflow.com/questions/19149839/pig-udf-maxmind-geoip-database-data-file-loading-issue

  • 相关阅读:
    关于iframe页面里的重定向问题
    iframe跨域解决方案
    sql 查询优化小计
    年轻不能遇见太惊艳的人
    图片上传预览
    脚本
    前端常见跨域解决方案
    react
    react高阶组件
    React + MobX 状态管理入门及实例
  • 原文地址:https://www.cnblogs.com/anny-1980/p/3673419.html
Copyright © 2011-2022 走看看