zoukankan      html  css  js  c++  java
  • 自己实现 一个MapReduce 示例


    有这样一个实际的问题需要要通过hadoop的来解决一下。
    有一个学生成绩表,有学生姓名 和成绩格式如下
    zs 89
    zs 100
    ls 98
    ls 100
    zs 20
    ww 89
    ww 67
    ls 30
    ww 20


    一个学生 有多个科目,有不同的成绩。
    需要对每个同学的成绩求平均值。
    同时,把这个student.txt 上传到 hadoop的 file System 中。

    ./bin/hadoop fs -put ~/file/student.txt

    代码如下:

    package com.picc.test;
    
    import java.io.IOException;
    import java.util.Iterator;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.picc.mapreducetest.MyMapReduceTest;
    /***
    * 定义一个AvgScore 求学生的平均值 要实现一个Tool 工具类,是为了初始化一个hadoop配置实例
    */
    public class AvgScore implements Tool{
        public static final Logger log=LoggerFactory.getLogger(AvgScore.class);
        Configuration configuration;
    	// 是版本 0.20.2的实现
        public static class MyMap extends Mapper<Object, Text, Text, IntWritable>{
    
            @Override
            protected void map(Object key, Text value, Context context)  throws IOException, InterruptedException {
                String stuInfo = value.toString();//将输入的纯文本的数据转换成String
                System.out.println("studentInfo:"+stuInfo);
                log.info("MapSudentInfo:"+stuInfo);
    			//将输入的数据先按行进行分割
                StringTokenizer tokenizerArticle = new StringTokenizer(stuInfo, "\n");
    			//分别对每一行进行处理
                while(tokenizerArticle.hasMoreTokens()){
    			// 每行按空格划分 
                    StringTokenizer tokenizer = new StringTokenizer(tokenizerArticle.nextToken());
                    String name = tokenizer.nextToken();//学生姓名
                    String score = tokenizer.nextToken();//学生成绩
                    Text stu = new Text(name);
                    int intscore = Integer.parseInt(score);
                    log.info("MapStu:"+stu.toString()+" "+intscore);
                    context.write(stu,new IntWritable(intscore));//输出学生姓名和成绩  
                }
            }
            
        }
        public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
    
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values,Context context)
                    throws IOException, InterruptedException {
                int sum=0;
                int count=0;
                Iterator<IntWritable> iterator=    values.iterator();
                while(iterator.hasNext()){
                    sum+=iterator.next().get();//计算总分
                    count++;//统计总科目
                }
                int avg= (int)sum/count;
                context.write(key,new  IntWritable(avg));//输出学生姓名和平均值
            }
            
        }
        public  int run(String [] args) throws Exception{
            
              Job job = new Job(getConf());
              job.setJarByClass(AvgScore.class);
              job.setJobName("avgscore");
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(IntWritable.class);
              job.setMapperClass(MyMap.class);
              job.setCombinerClass(MyReduce.class);
              job.setReducerClass(MyReduce.class);
              job.setInputFormatClass(TextInputFormat.class);
              job.setOutputFormatClass(TextOutputFormat.class);
              FileInputFormat.addInputPath(job, new Path(args[0]));//设置输入文件路径 
              FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置输出文件路径
              boolean success=  job.waitForCompletion(true);
              
              return success ? 0 : 1;
              
        }
       public static void main(String[] args) throws Exception {
        //在eclipse 工具上配置输入和输出参数
        int ret = ToolRunner.run(new AvgScore(), args);
        System.exit(ret);
       }
    @Override
    public Configuration getConf() {
        return configuration;
    }
    @Override
    public void setConf(Configuration conf) {
           conf = new Configuration();
           configuration=conf;
    }
    }
    我在eclipse 上配置参数。会报异常。所以,我把以上代码导出成 avgscore.jar
    把这个avgscore.jar 放到hadoop 0.20.2/目录下。
    输入命令 ./bin/hadoop  jar avgscore.jar com/picc/test/AvgScore  input/student.txt out1

    结果 图:

    和计算的结果 没有错。

    以下是对 以上算法的一个分析:

     package com.picc.test;
    
    import java.io.IOException;
    import java.util.Iterator;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.picc.mapreducetest.MyMapReduceTest;
    
    public class AvgScore implements Tool{
        public static final Logger log=LoggerFactory.getLogger(AvgScore.class);
        Configuration configuration;
        
        public static class MyMap extends Mapper<Object, Text, Text, IntWritable>{
            Configuration config = HBaseConfiguration.create();//获取hbase 的操作上下文
            private static IntWritable linenum = new IntWritable(1);//初始化一个变量值
            @Override
            protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                String stuInfo = value.toString();
                System.out.println("studentInfo:"+stuInfo);
                log.info("MapSudentInfo:"+stuInfo);
                StringTokenizer tokenizerArticle = new StringTokenizer(stuInfo, "\n");
                while(tokenizerArticle.hasMoreTokens()){
                    StringTokenizer tokenizer = new StringTokenizer(tokenizerArticle.nextToken());
                    String name = tokenizer.nextToken();
                    String score = tokenizer.nextToken();
                    Text stu = new Text(name);
                    int intscore = Integer.parseInt(score);
                    log.info("MapStu:"+stu.toString()+" "+intscore);
                    context.write(stu,new IntWritable(intscore));  //zs 90
                    //create 'stu','name','score'
                    HTable table=new HTable(config,"stu");
                    byte[] row1 = Bytes.toBytes("name"+linenum);
                    Put p1=new Put(row1);
                    byte[] databytes = Bytes.toBytes("name");
                    p1.add(databytes, Bytes.toBytes("1"), Bytes.toBytes(name));
                    table.put(p1);//put 'stu','name','name:1','zs'
                    table.flushCommits();
                    
                    byte [] row2 = Bytes.toBytes("score"+linenum);
                    Put p2 = new Put(row2);
                    byte [] databytes2 = Bytes.toBytes("score");
                    p2.add(databytes2, Bytes.toBytes("1"), Bytes.toBytes(score));
                    table.put(p2);//put 'stu','score','score:1','90'
                    table.flushCommits();
                    linenum= new IntWritable(linenum.get()+1);//对变量值进行变值处理
                }
            }
            
        }
        public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
    
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values,Context context)
                    throws IOException, InterruptedException {
                int sum=0;
                int count=0;
                Iterator<IntWritable> iterator=    values.iterator();
                while(iterator.hasNext()){
                    sum+=iterator.next().get();
                    count++;
                }
                int avg= (int)sum/count;
                context.write(key,new  IntWritable(avg));
            }
            
        }
        public  int run(String [] args) throws Exception{
            
              Job job = new Job(getConf());
              job.setJarByClass(AvgScore.class);
              job.setJobName("avgscore");
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(IntWritable.class);
              job.setMapperClass(MyMap.class);
              job.setCombinerClass(MyReduce.class);
              job.setReducerClass(MyReduce.class);
              job.setInputFormatClass(TextInputFormat.class);
              job.setOutputFormatClass(TextOutputFormat.class);
              FileInputFormat.addInputPath(job, new Path(args[0]));
              FileOutputFormat.setOutputPath(job, new Path(args[1]));
              boolean success=  job.waitForCompletion(true);
              
              return success ? 0 : 1;
              
        }
       public static void main(String[] args) throws Exception {
        
        int ret = ToolRunner.run(new AvgScore(), args);
        System.exit(ret);
       }
    @Override
    public Configuration getConf() {
        return configuration;
    }
    @Override
    public void setConf(Configuration conf) {
           conf = new Configuration();
           configuration=conf;
    }
    }
    
    
    

    这个代码是对上一个代码的调试分析处理后的代码,

    把map 处理的过程放到的数据库中,在MapReduce 中处理 hbase数据时,需要 把hbase 的数据包放到hadoop的lib 包下。

    处理的结果,见视图:


    注意,在hbase数据库中 row中的Key是不能相同的,否则会 后一条会覆盖前一条值。需要保让其唯一性。

    name1 和score1 是一条数据,这两列表是一个学生的成绩,和关系型数据库不同,以列值存储,思想需要转换一下。


  • 相关阅读:
    遍历Newtonsoft.Json.Linq.JObject
    JSON中JObject和JArray,JValue序列化(Linq)
    RabbitMQ学习系列二:.net 环境下 C#代码使用 RabbitMQ 消息队列
    RabbitMQ学习系列三:.net 环境下 C#代码订阅 RabbitMQ 消息并处理
    RabbitMQ学习系列一:windows下安装RabbitMQ服务
    红帽企业版Linux成为Linux下的.NET Core的参考平台
    LINUX下SYN FLOOD攻击及LINUX下SYN攻防简述
    Linux下tar命令的各种参数选项和他们的作用整理
    异常值监测的方法 Tukey test
    Git如何回滚代码?
  • 原文地址:https://www.cnblogs.com/java20130726/p/3218278.html
Copyright © 2011-2022 走看看