zoukankan      html  css  js  c++  java
  • 课堂练习-实验六

    1,mapper层

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import java.io.IOException;
    
    //前两个参数:key是某一行的起始位置相对于文件开头的偏移量  value是指每一行
    //后两个参数:单词和单词数目
    public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //获取一行的内容
            String line = value.toString();
            //每一行的单词是以空格隔开的,所以使用空格切割成数组
            String[] words = line.split(" ");
            for (String word:words
                 ) {
                //输出到reduce
                context.write(new Text(word),new LongWritable(1));
            }
        }
    }

    2,reduce层

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.io.Text;
    import java.io.IOException;
    
    //前两个参数分别接收的是mapper的后两个参数,所以类型跟mapper后两个参数的类型一致
    //后两个参数为reduce的输出参数
    public class WordCountReduce extends Reducer<Text, LongWritable,Text,LongWritable> {
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            long sum = 0;
            for (LongWritable value:values
                 ) {
                sum = sum + value.get();
            }
            //输出
            context.write(key,new LongWritable(sum));
        }
    }

    3,job层

    mport org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class WC {
            public static void main(String[] args)  throws Exception{
                    //1.获取job
                    Configuration conf=new Configuration();
                    Job job=Job.getInstance(conf);
                    //2.指定job使用的类
                    job.setJarByClass(WC.class);
                    //3.设置mapper的类以及属性
                    job.setMapperClass(WordCountMapper.class);
                    job.setMapOutputKeyClass(Text.class);
                    job.setMapOutputValueClass(LongWritable.class);
                    //4.设置reducer的类以及属
                    job.setReducerClass(WordCountReduce.class);
                    job.setOutputKeyClass(Text.class);
                    job.setOutputValueClass(LongWritable.class);
                    //5.设置输入文件
                    FileInputFormat.setInputPaths(job, new Path(args[0]));
                    //6.设置输出目录
                    FileOutputFormat.setOutputPath(job, new Path(args[1]));
                    //7.提交任务
                    job.waitForCompletion(true);
            }
    }
  • 相关阅读:
    mysql 主从复制
    confluence wiki搭建使用
    python 解析json loads dumps
    在linux下修改mysql的root密码
    [转]量子恒道统计
    php include
    php solr 查询
    php solr 扩展
    IKAnalyzer 和 solr4.3 冲突
    solr 4.3.0 配置
  • 原文地址:https://www.cnblogs.com/lq13035130506/p/11768089.html
Copyright © 2011-2022 走看看