zoukankan      html  css  js  c++  java
  • Hadoop基础---倒排索引实现

    一:实验说明

    用于统计各个单词在各个文件中出现的次数,并按序输出

    (一)实验数据

    a.txt

    hello kitty flink
    hello tom spark with
    hello mark spark
    hadoop hadoop hadoop

    b.txt

    hello tom tom hadoop
    tom is playing with mark
    flink vs spark to hadoop
    hadoop

    c.txt

    kitty want to learn hadoop
    hadoop spark flink
    cuda hello vs
    hello vs flink

    (二)实验结果

    举例单词:Hello

    在各个文件出现次数:

    hello--->a.txt  3
    hello--->b.txt  1
    hello--->c.txt  2

    结果输出形式:

    hello   a.txt--->3 c.txt--->2 b.txt--->1

    (三)实验思路

    首先统计原始数据,输出单词--->单词所在文件 单词在该文件出现次数,例如hello--->a.txt  3

    然后对文件出现次数进行排序,统一输出,例如:hello   a.txt--->3 c.txt--->2 b.txt--->1

    二:代码实现

    (一)统计原始数据,输出单词--->单词所在文件 单词在该文件出现次数

    package cn.hadoop.ri;
    
    import java.io.IOException;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class ReverseIndex {
    
        public static class ReverseIndexMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
            @Override
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                    throws IOException, InterruptedException {
                //先读取一行
                String line = value.toString();
                //进行字符切割
                String[] fields = StringUtils.split(line, " ");
                //获取这一行内容所在的切片信息
                FileSplit inputSplit = (FileSplit)context.getInputSplit();
                //获取文件名
                String file = inputSplit.getPath().getName();
                //进行输出
                for(String word:fields) {
                    context.write(new Text(word+"--->"+file), new LongWritable(1));
                }
            }
        }
        
        public static class ReverseIndexReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
            @Override
            protected void reduce(Text key, Iterable<LongWritable> values,
                    Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
                long count = 0;
                for(LongWritable value: values) {
                    count += value.get();
                }
                context.write(key, new LongWritable(count));
            }
        }
        
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            
            job.setJarByClass(ReverseIndex.class);
            
            job.setMapperClass(ReverseIndexMapper.class);
            job.setReducerClass(ReverseIndexReducer.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            
            //检测输出目录
            Path output = new Path(args[1]);
            FileSystem fs = FileSystem.get(conf);
            if(fs.exists(output)) {
                fs.delete(output, true);    //递归删除
            }
            
            FileOutputFormat.setOutputPath(job, output);
            
            System.exit(job.waitForCompletion(true)?0:1);
        }
    }

    实验结果输出:

    hadoop jar ri.jar cn.hadoop.ri.ReverseIndex /wc/ri /wc/ro1 
    [hadoop@hadoopH1 Hadoop]$ hadoop fs -cat /wc/ro1/part-r-00000
    cuda--->c.txt   1
    flink--->a.txt  1
    flink--->b.txt  1
    flink--->c.txt  2
    hadoop--->a.txt 3
    hadoop--->b.txt 3
    hadoop--->c.txt 2
    hello--->a.txt  3
    hello--->b.txt  1
    hello--->c.txt  2
    is--->b.txt     1
    kitty--->a.txt  1
    kitty--->c.txt  1
    learn--->c.txt  1
    mark--->a.txt   1
    mark--->b.txt   1
    playing--->b.txt        1
    spark--->a.txt  2
    spark--->b.txt  1
    spark--->c.txt  1
    to--->b.txt     1
    to--->c.txt     1
    tom--->a.txt    1
    tom--->b.txt    3
    vs--->b.txt     1
    vs--->c.txt     2
    want--->c.txt   1
    with--->a.txt   1
    with--->b.txt   1

    (二)对文件出现次数进行排序,统一输出

    package cn.hadoop.ri;
    
    import java.io.IOException;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import cn.hadoop.ri.ReverseIndex.ReverseIndexMapper;
    import cn.hadoop.ri.ReverseIndex.ReverseIndexReducer;
    
    public class ReverseIndexStep2 {
        public static class ReverseIndexMapper extends Mapper<LongWritable, Text, Text, Text>{
            @Override
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                //先获取一行数据
                String line = value.toString();
                //进行数据划分
                String[] fields = StringUtils.split(line, "--->");
                String cont = fields[0];
                
                //再次划分
                String[] fields_2 = StringUtils.split(fields[1], "	");
                String fn = fields_2[0];
                long ct = Long.parseLong(fields_2[1]);
                
                //进行写入
                context.write(new Text(cont), new Text(fn+"--->"+ct));
            }
        }
        
        
        public static class ReverseIndexReducer extends Reducer<Text, Text, Text, Text>{
            protected int compare(String str1,String str2) {
                long c1 = Long.parseLong(StringUtils.split(str1, "--->")[1]);
                long c2 = Long.parseLong(StringUtils.split(str2, "--->")[1]);
                return c1 >= c2 ? 1 : -1;
            }
            
            
            @Override
            protected void reduce(Text key, Iterable<Text> values,
                    Reducer<Text, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                int i = 0,len = 0;
                String str_t="";
                for(Text value:values) {
                    len++;
                    str_t += value.toString()+" ";
                }
                
                String[] str_s = StringUtils.split(str_t, " ");
                
                for(i=0; i<len-1; i++) {    //使用冒泡处理排序
                    for(int j=1; j<len; j++) {
                        if(compare(str_s[i],str_s[j])==-1) {
                            String tmp = str_s[i];
                            str_s[i] = str_s[j];
                            str_s[j] = tmp;
                        }
                    }
                }
                
                String RIRes= "";
                for(i=0;i<len;i++) {
                    RIRes += str_s[i] + " ";
                }
                
                context.write(key, new Text(RIRes));
            }
        }
        
        public static void main(String[]  args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            
            job.setJarByClass(ReverseIndexStep2.class);
            
            job.setMapperClass(ReverseIndexMapper.class);
            job.setReducerClass(ReverseIndexReducer.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            
            //检测输出目录
            Path output = new Path(args[1]);
            FileSystem fs = FileSystem.get(conf);
            if(fs.exists(output)) {
                fs.delete(output, true);    //递归删除
            }
            
            FileOutputFormat.setOutputPath(job, output);
            
            System.exit(job.waitForCompletion(true)?0:1);
    
        }
    }

    实验结果输出:

    hadoop jar ri.jar cn.hadoop.ri.ReverseIndexStep2 /wc/ro1 /wc/ro2
    [hadoop@hadoopH1 Hadoop]$ hadoop fs -cat /wc/ro2/part-r-00000                             
    cuda    c.txt--->1 
    flink   c.txt--->2 b.txt--->1 a.txt--->1 
    hadoop  a.txt--->3 b.txt--->3 c.txt--->2 
    hello   a.txt--->3 c.txt--->2 b.txt--->1 
    is      b.txt--->1 
    kitty   a.txt--->1 c.txt--->1 
    learn   c.txt--->1 
    mark    a.txt--->1 b.txt--->1 
    playing b.txt--->1 
    spark   a.txt--->2 b.txt--->1 c.txt--->1 
    to      b.txt--->1 c.txt--->1 
    tom     b.txt--->3 a.txt--->1 
    vs      c.txt--->2 b.txt--->1 
    want    c.txt--->1 
    with    a.txt--->1 b.txt--->1
  • 相关阅读:
    ubuntu16.04安装配置nagios
    springboot+mybatis+springmvc整合实例
    网站性能优化小结和spring整合redis
    mybatis的批量更新实例
    安装webpack和webpack打包(此文转自Henery)
    微信扫描二维码下载软件
    ubuntu16.04设置tomcat自启动
    无意中在sql日志中发现如下内容,
    实现虚拟模式的动态数据加载Windows窗体DataGridView控件 .net 4.5 (一)
    (C#)WinForm窗体间传值
  • 原文地址:https://www.cnblogs.com/ssyfj/p/12360313.html
Copyright © 2011-2022 走看看