zoukankan      html  css  js  c++  java
  • Hadoop1.2.0开发笔记(九)

    人类学习的方式在很大程度上始于模仿,“古者包犠氏之王天下也……作结绳而为网罟,以佃以渔,盖取诸离”,古人从自然法则中求生存,逐步走出蒙昧,人法地,地法天,天法道,道法自然。(历代对本句训诂汗牛充栋,还不如本人的解释来得直接 ,顺便鄙视一下那些训诂专家,小题大做,愚不可及)

    而本文要描述的是,先来模仿几个hadoop的example,以增强hadoop编程的感悟能力

    从下面几个example可以增强理解MapReduce的具体处理过程,包括输入输出的类型以及shuffle的功能

    1 数据去重

    public class Dedup {
        
        //map将输入中的value复制到输出数据的key上,并直接输出
        public static class Map extends Mapper<Object,Text,Text,Text>{
            private static Text line=new Text();//每行数据       
    
            //实现map函数
            public void map(Object key,Text value,Context context)
                    throws IOException,InterruptedException{
                line=value;
                context.write(line, new Text(""));
            }
        }   
    
        //reduce将输入中的key复制到输出数据的key上,并直接输出
        public static class Reduce extends Reducer<Text,Text,Text,Text>{
            //实现reduce函数
            public void reduce(Text key,Iterable<Text> values,Context context)
                    throws IOException,InterruptedException{
                context.write(key, new Text(""));
            }
        }
    
        /**
         * @param args
         */
        public static void main(String[] args) throws Exception {        
            // TODO Auto-generated method stub
            Configuration conf = new Configuration();        
    
            String[] ioArgs=new String[]{"dedup_in","dedup_out"};
            String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
            if (otherArgs.length != 2) 
            {
                System.err.println("Usage: Data Deduplication <in> <out>");
                System.exit(2);
            }
            
            Job job = new Job(conf, "Data Deduplication");
            job.setJarByClass(Dedup.class);         
    
            //设置Map、Combine和Reduce处理类
            job.setMapperClass(Map.class);
            job.setCombinerClass(Reduce.class);
            job.setReducerClass(Reduce.class);         
    
            //设置输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);         
    
            //设置输入和输出目录
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

    2  数据排序

    public class Sort {
    
        // map将输入中的value化成IntWritable类型,作为输出的key
        public static class Map extends
                Mapper<Object, Text, IntWritable, IntWritable> {
    
            private static IntWritable data = new IntWritable();
    
            // 实现map函数
            public void map(Object key, Text value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                data.set(Integer.parseInt(line));
                context.write(data, new IntWritable(1));
            }
        }
    
        // reduce将输入中的key复制到输出数据的key上,
        // 然后根据输入的value-list中元素的个数决定key的输出次数
        // 用全局linenum来代表key的位次
        public static class Reduce extends
                Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
    
            private static IntWritable linenum = new IntWritable(1);
    
            // 实现reduce函数
            public void reduce(IntWritable key, Iterable<IntWritable> values,
                    Context context) throws IOException, InterruptedException {
                for (IntWritable val : values) {
                    context.write(linenum, key);
                    linenum = new IntWritable(linenum.get() + 1);
                }
            }
        }
    
        /**
         * @param args
         * @throws Exception
         */
        public static void main(String[] args) throws Exception {
            // TODO Auto-generated method stub
            Configuration conf = new Configuration();
    
            String[] ioArgs = new String[] { "sort_in", "sort_out" };
            String[] otherArgs = new GenericOptionsParser(conf, ioArgs)
                    .getRemainingArgs();
            if (otherArgs.length != 2) {
                System.err.println("Usage: Data Sort <in> <out>");
                System.exit(2);
            }
    
            Job job = new Job(conf, "Data Sort");
            job.setJarByClass(Sort.class);
            
            // 设置Map和Reduce处理类
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
    
            // 设置输出类型
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(IntWritable.class);
    
            // 设置输入和输出目录
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

    3 平均成绩

    public class Score {
    
        public static class Map extends    Mapper<LongWritable, Text, Text, IntWritable> {
    
            // 实现map函数
            public void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
    
                // 将输入的纯文本文件的数据转化成String
                String line = value.toString();
                // 将输入的数据首先按行进行分割
                StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");
    
                // 分别对每一行进行处理
                while (tokenizerArticle.hasMoreElements()) {
                    // 每行按空格划分
                    StringTokenizer tokenizerLine = new StringTokenizer(
                            tokenizerArticle.nextToken());
    
                    String strName = tokenizerLine.nextToken();// 学生姓名部分
                    String strScore = tokenizerLine.nextToken();// 成绩部分
    
                    Text name = new Text(strName);
                    int scoreInt = Integer.parseInt(strScore);
    
                    // 输出姓名和成绩
                    context.write(name, new IntWritable(scoreInt));
                }
            }
        }
    
        public static class Reduce extends
                Reducer<Text, IntWritable, Text, IntWritable> {
    
            // 实现reduce函数
            public void reduce(Text key, Iterable<IntWritable> values,
                    Context context) throws IOException, InterruptedException {
    
                int sum = 0;
                int count = 0;
    
                Iterator<IntWritable> iterator = values.iterator();
                while (iterator.hasNext()) {
                    sum += iterator.next().get();// 计算总分
                    count++;// 统计总的科目数
                }
    
                int average = (int) sum / count;// 计算平均成绩
                context.write(key, new IntWritable(average));
            }
        }
    
        /**
         * @param args
         * @throws Exception 
         */
        public static void main(String[] args) throws Exception {
            // TODO Auto-generated method stub
            Configuration conf = new Configuration();
            
            String[] ioArgs = new String[] { "score_in", "score_out" };
    
            String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
            if (otherArgs.length != 2) {
                System.err.println("Usage: Score Average <in> <out>");
                System.exit(2);
            }
    
            Job job = new Job(conf, "Score Average");
            job.setJarByClass(Score.class);
            
            // 设置Map、Combine和Reduce处理类
            job.setMapperClass(Map.class);
            job.setCombinerClass(Reduce.class);
            job.setReducerClass(Reduce.class);
    
            // 设置输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            // 将输入的数据集分割成小数据块splites,提供一个RecordReader的实现
            job.setInputFormatClass(TextInputFormat.class);
    
            // 提供一个RecordWriter的实现,负责数据输出
            job.setOutputFormatClass(TextOutputFormat.class);
    
            // 设置输入和输出目录
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

    4  倒排索引

    public class InvertedIndex {
        public static class Map extends Mapper<Object, Text, Text, Text> {
    
            private Text keyInfo = new Text(); // 存储单词和URL组合
            private Text valueInfo = new Text(); // 存储词频
            private FileSplit split; // 存储Split对象
    
            // 实现map函数
            public void map(Object key, Text value, Context context)
                    throws IOException, InterruptedException {
    
                // 获得<key,value>对所属的FileSplit对象
                split = (FileSplit) context.getInputSplit();
    
                StringTokenizer itr = new StringTokenizer(value.toString());
    
                while (itr.hasMoreTokens()) {
                    // key值由单词和URL组成,如"MapReduce:file1.txt"
                    // 获取文件的完整路径
                    // keyInfo.set(itr.nextToken()+":"+split.getPath().toString());
                    // 这里为了好看,只获取文件的名称。
                    int splitIndex = split.getPath().toString().indexOf("file");
                    keyInfo.set(itr.nextToken() + ":"
                            + split.getPath().toString().substring(splitIndex));
    
                    // 词频初始化为1
                    valueInfo.set("1");
    
                    context.write(keyInfo, valueInfo);
                }
            }
        }
    
        public static class Combine extends Reducer<Text, Text, Text, Text> {
    
            private Text info = new Text();
    
            // 实现reduce函数
            public void reduce(Text key, Iterable<Text> values, Context context)
                    throws IOException, InterruptedException {
    
                // 统计词频
                int sum = 0;
                for (Text value : values) {
                    sum += Integer.parseInt(value.toString());
                }
    
                int splitIndex = key.toString().indexOf(":");
    
                // 重新设置value值由URL和词频组成
                info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
                // 重新设置key值为单词
                key.set(key.toString().substring(0, splitIndex));
    
                context.write(key, info);
            }
        }
    
        public static class Reduce extends Reducer<Text, Text, Text, Text> {
    
            private Text result = new Text();
    
            // 实现reduce函数
            public void reduce(Text key, Iterable<Text> values, Context context)
                    throws IOException, InterruptedException {
    
                // 生成文档列表
                String fileList = new String();
                for (Text value : values) {
                    fileList += value.toString() + ";";
                }
    
                result.set(fileList);
    
                context.write(key, result);
            }
        }
    
        /**
         * @param args
         * @throws Exception
         */
        public static void main(String[] args) throws Exception {
            // TODO Auto-generated method stub
            Configuration conf = new Configuration();
    
            String[] ioArgs = new String[] { "index_in", "index_out" };
            String[] otherArgs = new GenericOptionsParser(conf, ioArgs)
                    .getRemainingArgs();
            if (otherArgs.length != 2) {
                System.err.println("Usage: Inverted Index <in> <out>");
                System.exit(2);
            }
    
            Job job = new Job(conf, "Inverted Index");
            job.setJarByClass(InvertedIndex.class);
    
            // 设置Map、Combine和Reduce处理类
            job.setMapperClass(Map.class);
            job.setCombinerClass(Combine.class);
            job.setReducerClass(Reduce.class);
    
            // 设置Map输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
    
            // 设置Reduce输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            // 设置输入和输出目录
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

    --------------------------------------------------------------------------- 

    本系列Hadoop1.2.0开发笔记系本人原创 

    转载请注明出处 博客园 刺猬的温驯  

    本文链接 http://www.cnblogs.com/chenying99/archive/2013/06/03/3114564.html

  • 相关阅读:
    kerberos认证原理---讲的非常细致,易懂(转发)
    CDH安装之篇四:启用Kerberos认证(转发)
    RabbitMQ和Kafka(转发)(待续)
    Kafka的Log存储解析(转发)(待续)
    滴滴passport设计之道:帐号体系高可用的7条经验(含PPT)(转发)(待续)
    淘宝的消息中间件(2013) (转发)(待续)
    设计消息中间件时我关心什么?(解密电商数据一致性与完整性实现,含PPT)(转发)
    一种提高微服务架构的稳定性与数据一致性的方法(转发)
    大型网站架构系列:消息队列(转发)
    Kafka实战解惑(转发)
  • 原文地址:https://www.cnblogs.com/chenying99/p/3114564.html
Copyright © 2011-2022 走看看