最近几天,在研究怎么样把日志中的IP地址转化成具体省份城市。
希望写一个pig udf
IP数据库采用的纯真IP数据库文件qqwry.dat,可以从http://www.cz88.net/下载。
这里关键点在于怎么样读取这个文件,浪费了二天时间,现在把代码记录下来供和我遇到相同问题的朋友参考。
pig script
register /usr/local/pig/mypigudf.jar; define ip2address my.pig.func.IP2Address('/user/anny/qqwry.dat'); a = load '/user/anny/hdfs/logtestdata/ipdata.log' as (ip:chararray); b = foreach a generate ip,ip2address(ip) as cc:map[chararray]; c = foreach b generate ip,cc#'province' as province,cc#'city' as city,cc#'region' as region; dump c;
java写的pig udf:
package my.pig.func; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import my.pig.func.IPConvertCity.IPSeeker; import my.pig.func.IPConvertCity.IPUtil; import my.pig.func.IPConvertCity.LogFactory; import org.apache.log4j.Level; import org.apache.pig.EvalFunc; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.schema.Schema; public class IP2Address extends EvalFunc<Map<String, Object>> { private String lookupFile = ""; private RandomAccessFile objFile = null; public IP2Address(String file) { this.lookupFile = file; } @Override public Map<String, Object> exec(Tuple input) throws IOException { if (input == null || input.size() == 0 || input.get(0) == null) return null; Map<String, Object> output = new HashMap<String, Object>(); String str = (String) input.get(0); try { if (str.length() == 0) return output; if (objFile == null) { try { objFile = new RandomAccessFile("./qqwry.dat", "r"); } catch (FileNotFoundException e1) { System.out.println("IP地址信息文件没有找到" + lookupFile); return null; } } IPSeeker seeker = new IPSeeker(objFile); String country = seeker.getCountry(str); output = IPUtil.splitCountry(country); return output; } catch (Exception e) { return output; } } @Override public Schema outputSchema(Schema input) { return new Schema(new Schema.FieldSchema(null, DataType.MAP)); } public List<String> getCacheFiles() { List<String> list = new ArrayList<String>(1); list.add(lookupFile + "#qqwry.dat"); return list; } }
Search for "Distributed Cache" in this page of the Pig docs: http://pig.apache.org/docs/r0.11.0/udf.html
The example it shows using the getCacheFiles() method should ensure that the file is accessible to all the nodes in the cluster.
参考文章:http://stackoverflow.com/questions/17514022/access-hdfs-file-from-udf
http://stackoverflow.com/questions/19149839/pig-udf-maxmind-geoip-database-data-file-loading-issue