zoukankan      html  css  js  c++  java
  • 基于Mapreduce数据排序

     1 import java.io.IOException;
     2 import org.apache.hadoop.conf.Configuration;
     3 import org.apache.hadoop.fs.Path;
     4 import org.apache.hadoop.io.IntWritable;
     5 import org.apache.hadoop.io.Text;
     6 import org.apache.hadoop.mapreduce.Job;
     7 import org.apache.hadoop.mapreduce.Mapper;
     8 import org.apache.hadoop.mapreduce.Reducer;
     9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    11 import org.apache.hadoop.util.GenericOptionsParser;
    12 
    13 public class Sort {
    14     // map将输入中的value化成IntWritable类型,作为输出的key
    15     public static class Map extends
    16             Mapper<Object, Text, IntWritable, IntWritable> {
    17         private static IntWritable data = new IntWritable();
    18 
    19         // 实现map函数
    20         public void map(Object key, Text value, Context context)
    21                 throws IOException, InterruptedException {
    22 
    23             String line = value.toString();
    24 
    25             data.set(Integer.parseInt(line));
    26 
    27             context.write(data, new IntWritable(1));
    28         }
    29     }
    30 
    31     // reduce将输入中的key复制到输出数据的key上,
    32 
    33     // 然后根据输入的value-list中元素的个数决定key的输出次数
    34 
    35     // 用全局linenum来代表key的位次
    36 
    37     public static class Reduce extends
    38 
    39     Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
    40         private static IntWritable linenum = new IntWritable(1);
    41 
    42         // 实现reduce函数
    43         public void reduce(IntWritable key, Iterable<IntWritable> values,
    44                 Context context)
    45 
    46         throws IOException, InterruptedException {
    47 
    48             for (IntWritable val : values) {
    49 
    50                 context.write(linenum, key);
    51 
    52                 linenum = new IntWritable(linenum.get() + 1);
    53 
    54             }
    55         }
    56     }
    57 
    58     public static void main(String[] args) throws Exception {
    59 
    60         Configuration conf = new Configuration();
    61         conf.set("mapred.job.tracker", "localhost:9000");
    62         String[] ioArgs = new String[] { "hdfs://localhost:9000/input/sort",
    63                 "hdfs://localhost:9000/output/sortout" };
    64 
    65         String[] otherArgs = new GenericOptionsParser(conf, ioArgs)
    66                 .getRemainingArgs();
    67 
    68         if (otherArgs.length != 2) {
    69 
    70             System.err.println("Usage: Data Sort <in> <out>");
    71 
    72             System.exit(2);
    73 
    74         }
    75         // Job job = new Job(conf, "Data Sort");
    76         Job job = Job.getInstance(conf, "Data Sort");
    77 
    78         job.setJarByClass(Sort.class);
    79         // 设置Map和Reduce处理类
    80         job.setMapperClass(Map.class);
    81 
    82         job.setReducerClass(Reduce.class);
    83         // 设置输出类型
    84         job.setOutputKeyClass(IntWritable.class);
    85 
    86         job.setOutputValueClass(IntWritable.class);
    87         // 设置输入和输出目录
    88         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    89 
    90         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    91 
    92         System.exit(job.waitForCompletion(true) ? 0 : 1);
    93     }
    94 }
    个人学习记录
  • 相关阅读:
    linux 解压tgz 文件指令
    shell 脚本没有执行权限 报错 bash: ./myshell.sh: Permission denied
    linux 启动solr 报错 Your Max Processes Limit is currently 31202. It should be set to 65000 to avoid operational disruption.
    远程查询批量导入数据
    修改 MZTreeView 赋权节点父节点选中子节点自动选中的问题
    关于乱码的问题解决记录
    我的网站优化之路
    对设计及重构的一点反思
    我的五年岁月
    奔三的路上
  • 原文地址:https://www.cnblogs.com/jeshy/p/15244746.html
Copyright © 2011-2022 走看看