zoukankan      html  css  js  c++  java
  • Hadoop Bloom filter应用示例

     

    Hadoop0.20.2 Bloom filter应用示例

    1. 简介

        参见《Hadoop in Action》P102 以及 《Hadoop实战(第2版)》(陆嘉恒)P69

        

               

    2. 案例

        网上大部分的说明仅仅是按照《Hadoop in Action》中的示例代码给出,这里是Hadoop0.20.2版本,在该版本中已经实现了BloomFilter。

        案例文件如下:

        customers.txt

        1,Stephanie Leung,555-555-5555
        2,Edward Kim,123-456-7890
        3,Jose Madriz,281-330-8004
        4,David Stork,408-555-0000

        -----------------------------------------------------------------

        orders.txt

        3,A,12.95,02-Jun-2008
        1,B,88.25,20-May-2008
        2,C,32.00,30-Nov-2007
        3,D,25.02,22-Jan-2009
        5,E,34.59,05-Jan-2010
        6,F,28.67,16-Jan-2008
        7,G,49.82,24-Jan-2009

        两个文件通过customer ID关联。

    3. 代码

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. import java.io.BufferedReader;  
    2. import java.io.IOException;  
    3. import java.io.InputStreamReader;  
    4. import java.util.ArrayList;  
    5.   
    6. import org.apache.hadoop.conf.Configuration;  
    7. import org.apache.hadoop.fs.FSDataInputStream;  
    8. import org.apache.hadoop.fs.FileSystem;  
    9. import org.apache.hadoop.fs.Path;  
    10. import org.apache.hadoop.io.Text;  
    11. import org.apache.hadoop.mapreduce.Job;  
    12. import org.apache.hadoop.mapreduce.Mapper;  
    13. import org.apache.hadoop.mapreduce.Reducer;  
    14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    15. import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
    16. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
    17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    18. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
    19. import org.apache.hadoop.util.GenericOptionsParser;  
    20. import org.apache.hadoop.util.bloom.BloomFilter;  
    21. import org.apache.hadoop.util.bloom.Key;  
    22. import org.apache.hadoop.util.hash.Hash;  
    23.   
    24. public class BloomMRMain {  
    25.     public static class BloomMapper extends Mapper<Object, Text, Text, Text> {  
    26.         BloomFilter bloomFilter = new BloomFilter(10000, 6, Hash.MURMUR_HASH);  
    27.           
    28.         protected void setup(Context context) throws IOException ,InterruptedException {  
    29.             Configuration conf = context.getConfiguration();  
    30.               
    31.             String path = "hdfs://localhost:9000/user/hezhixue/input/customers.txt";  
    32.             Path file = new Path(path);  
    33.               
    34.             FileSystem hdfs = FileSystem.get(conf);  
    35.             FSDataInputStream dis = hdfs.open(file);  
    36.             BufferedReader reader = new BufferedReader(new InputStreamReader(dis));  
    37.             String temp;    
    38.             while ((temp = reader.readLine()) != null) {   
    39. //              System.out.println("bloom filter temp:" + temp);  
    40.                 String[] tokens = temp.split(",");  
    41.                 if (tokens.length > 0) {  
    42.                     bloomFilter.add(new Key(tokens[0].getBytes()));  
    43.                 }  
    44.             }  
    45.         }  
    46.           
    47.         protected void map(Object key, Text value, Context context) throws IOException ,InterruptedException {  
    48.             //获得文件输入路径  
    49.             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();  
    50.             if (pathName.contains("customers")) {  
    51.                 String data = value.toString();  
    52.                 String[] tokens = data.split(",");  
    53.                 if (tokens.length == 3) {  
    54.                     String outKey = tokens[0];  
    55.                     String outVal = "0" + ":" + tokens[1] + "," + tokens[2];  
    56.                     context.write(new Text(outKey), new Text(outVal));  
    57.                 }  
    58.             } else if (pathName.contains("orders")) {  
    59.                 String data = value.toString();  
    60.                 String[] tokens = data.split(",");  
    61.                 if (tokens.length == 4) {  
    62.                     String outKey = tokens[0];  
    63.                     System.out.println("in map and outKey:" + outKey);  
    64.                     if (bloomFilter.membershipTest(new Key(outKey.getBytes()))) {  
    65.                         String outVal = "1" + ":" + tokens[1] + "," + tokens[2]+ "," + tokens[3];  
    66.                         context.write(new Text(outKey), new Text(outVal));  
    67.                     }  
    68.                 }  
    69.             }  
    70.         }  
    71.     }  
    72.       
    73.     public static class BloomReducer extends Reducer<Text, Text, Text, Text> {  
    74.         ArrayList<Text> leftTable = new ArrayList<Text>();  
    75.         ArrayList<Text> rightTable = new ArrayList<Text>();  
    76.           
    77.         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException ,InterruptedException {  
    78.               
    79.              leftTable.clear();  
    80.              rightTable.clear();  
    81.               
    82.             for (Text val : values) {  
    83.                 String outVal = val.toString();  
    84.                 System.out.println("key: " + key.toString() + " : " + outVal);  
    85.                 int index = outVal.indexOf(":");  
    86.                 String flag = outVal.substring(0, index);  
    87.                 if ("0".equals(flag)) {  
    88.                     leftTable.add(new Text(outVal.substring(index+1)));  
    89.                 } else if ("1".equals(flag)) {  
    90.                     rightTable.add(new Text(outVal.substring(index + 1)));  
    91.                 }  
    92.             }  
    93.               
    94.               
    95.             if (leftTable.size() > 0 && rightTable.size() > 0) {  
    96.                 for(Text left : leftTable) {  
    97.                     for (Text right : rightTable) {  
    98.                         context.write(key, new Text(left.toString() + "," + right.toString()));  
    99.                     }  
    100.                 }  
    101.             }  
    102.         }  
    103.     }  
    104.       
    105.     public static void main(String[] args) throws Exception {  
    106.         Configuration conf = new Configuration();  
    107.           
    108.         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
    109.           
    110.         if (otherArgs.length != 2) {  
    111.           System.err.println("Usage: BloomMRMain <in> <out>");  
    112.           System.exit(2);  
    113.         }         
    114.           
    115.         Job job = new Job(conf, "BloomMRMain");  
    116.         job.setJarByClass(BloomMRMain.class);  
    117.           
    118.         job.setMapperClass(BloomMapper.class);  
    119.         job.setReducerClass(BloomReducer.class);  
    120.           
    121.         job.setInputFormatClass(TextInputFormat.class);  
    122.         job.setOutputFormatClass(TextOutputFormat.class);  
    123.           
    124.         job.setMapOutputKeyClass(Text.class);  
    125.         job.setMapOutputValueClass(Text.class);  
    126.           
    127.         job.setOutputKeyClass(Text.class);  
    128.         job.setOutputValueClass(Text.class);      
    129.           
    130.         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
    131.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
    132.           
    133.         System.exit(job.waitForCompletion(true) ? 0 : 1);  
    134.     }  
    135. }  
  • 相关阅读:
    Java中net.sf.json包关于JSON与对象互转的坑
    Java IO(1)基础知识——字节与字符
    [Github]给已创建的GitHub项目添加LICENSE
    [MAC]激活Sublime Text
    [MAC]安装配置Charles
    [iOS]15个iOS视频播放控件
    [iOS]UIWindow详解
    [Swift]LeetCode1320. 二指输入的的最小距离 | Minimum Distance to Type a Word Using Two Fingers
    [Swift]LeetCode1319. 连通网络的操作次数 | Number of Operations to Make Network Connected
    [Swift]LeetCode1318. 或运算的最小翻转次数 | Minimum Flips to Make a OR b Equal to c
  • 原文地址:https://www.cnblogs.com/bendantuohai/p/4744774.html
Copyright © 2011-2022 走看看