zoukankan      html  css  js  c++  java
  • 自定义实现InputFormat、OutputFormat、输出到多个文件目录中去、hadoop1.x api写单词计数的例子、运行时接收命令行参数,代码例子

    一:自定义实现InputFormat

    *数据源来自于内存
    *1.InputFormat是用于处理各种数据源的,下面是实现InputFormat,数据源是来自于内存.
    *1.1 在程序的job.setInputFormatClass(MyselfmemoryInputFormat.class);
    *1.2 实现InputFormat,extends InputFormat< , >,实现其中的两个方法,分别是getSplits(..),createRecordReader(..).
    *1.3 getSplits(..)返回的是一个java.util.List<T>,List中的每个元素是InputSplit.每个InputSplit对应一个mappper任务.
    *1.4 InputSplit是对原始海量数据源的划分,因为我们处理的是海量数据,不划分不行.InputSplit数据的大小完全是我们自己来定的.本例中是在内存中产生数据,然后封装到InputSplit.
    *1.5 InputSplit封装的是hadoop数据类型,实现Writable接口.
    *1.6 RecordReader读取每个InputSplit中的数据.解析成一个个<k,v>,供map处理.
    *1.7 RecordReader有4个核心方法,分别是initalize(..).nextKeyValue(),getCurrentKey(),getCurrentValue().
    *1.8 initalize重要性在于是拿到InputSplit和定义临时变量.
    *1.9 nexKeyValue(..)该方法的每次调用,可以获得key和value值.
    *1.10 当nextKeyValue(..)调用后,紧接着调用getCurrentKey(),getCurrentValue().
    *       mapper方法中的run方法调用.

    public class MyselInputFormatApp {
            private static final String OUT_PATH = "hdfs://hadoop1:9000/out";// 输出路径,reduce作业输出的结果是一个目录
            public static void main(String[] args) {
                Configuration conf = new Configuration();// 配置对象
                try {
                    FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
                    fileSystem.delete(new Path(OUT_PATH), true);
                    Job job = new Job(conf, WordCountApp.class.getSimpleName());// jobName:作业名称
                    job.setJarByClass(WordCountApp.class);
                    
                    job.setInputFormatClass(MyselfMemoryInputFormat.class);
                    job.setMapperClass(MyMapper.class);// 指定自定义map类
                    job.setMapOutputKeyClass(Text.class);// 指定map输出key的类型
                    job.setMapOutputValueClass(LongWritable.class);// 指定map输出value的类型
                    job.setReducerClass(MyReducer.class);// 指定自定义Reduce类
                    job.setOutputKeyClass(Text.class);// 设置Reduce输出key的类型
                    job.setOutputValueClass(LongWritable.class);// 设置Reduce输出的value类型
                    FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));// Reduce输出完之后,就会产生一个最终的输出,指定最终输出的位置
                    job.waitForCompletion(true);// 提交给jobTracker并等待结束
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            public static class MyMapper extends
                    Mapper<NullWritable, Text, Text, LongWritable> {
                @Override
                protected void map(NullWritable key, Text value, Context context)
                        throws IOException, InterruptedException {
                    String line = value.toString();
                    String[] splited = line.split("	");
                    for (String word : splited) {
                        context.write(new Text(word), new LongWritable(1));// 把每个单词出现的次数1写出去.
                    }
                }
            }
    
            public static class MyReducer extends
                    Reducer<Text, LongWritable, Text, LongWritable> {
                @Override
                protected void reduce(Text key, Iterable<LongWritable> values,
                        Context context) throws IOException, InterruptedException {
                    long count = 0L;
                    for (LongWritable times : values) {
                        count += times.get();
                    }
                    context.write(key, new LongWritable(count));
                }
            }
            
            /**
             * 从内存中产生数据,然后解析成一个个的键值对
             *
             */
            public static class MyselfMemoryInputFormat extends InputFormat<NullWritable,Text>{
    
                @Override
                public List<InputSplit> getSplits(JobContext context)
                        throws IOException, InterruptedException {
                    ArrayList<InputSplit> result = new ArrayList<InputSplit>();
                    result.add(new MemoryInputSplit());
                    result.add(new MemoryInputSplit());
                    result.add(new MemoryInputSplit());
                    return result;
                }
    
                @Override
                public RecordReader<NullWritable, Text> createRecordReader(
                        InputSplit split, TaskAttemptContext context)
                        throws IOException, InterruptedException {
                    return new MemoryRecordReader();
                }
            }
            
            public static class MemoryInputSplit extends InputSplit implements Writable{
                int SIZE = 10;
                //java中的数组在hadoop中不被支持,所以这里使用hadoop的数组
                //在hadoop中使用的是这种数据结构,不能使用java中的数组表示.
                ArrayWritable arrayWritable = new ArrayWritable(Text.class);
                /**
                 * 先创建一个java数组类型,然后转化为hadoop的数据类型.
                 * @throws FileNotFoundException 
                 */
                public MemoryInputSplit() throws FileNotFoundException {
                    //一个inputSplit供一个map使用,map函数如果要被调用多次的话,意味着InputSplit必须解析出多个键值对
                    Text[] array = new Text[SIZE];
                    Random random = new Random();
                    for(int i=0;i<SIZE;i++){
                        int nextInt = random.nextInt(999999);
                        Text text = new Text("Text"+nextInt);
                        array[i] = text ;
                    }
                    
    //                FileInputStream fs = new FileInputStream(new File("\etc\profile"));//从文件中读取
    //                将流中的数据解析出来放到数据结构中.
                    arrayWritable.set(array);
                }
                @Override
                public long getLength() throws IOException, InterruptedException {
                    return SIZE;
                }
                @Override
                public String[] getLocations() throws IOException,
                        InterruptedException {
                    return new String[]{};
                }
                public ArrayWritable getValues() {
                    return arrayWritable;
                }
                @Override
                public void write(DataOutput out) throws IOException {
                    arrayWritable.write(out);
                }
                @Override
                public void readFields(DataInput in) throws IOException {
                    arrayWritable.readFields(in); 
                }
            }
            
            public static class MemoryRecordReader extends RecordReader<NullWritable, Text>{
                private Writable[] values = null ;
                private Text value = null ;
                private int i = 0;
                @Override
                public void initialize(InputSplit split, TaskAttemptContext context)
                        throws IOException, InterruptedException {
                    MemoryInputSplit inputSplit = (MemoryInputSplit)split;
                    ArrayWritable writables = inputSplit.getValues();
                    this.values = writables.get();
                    this.i = 0 ;
                }
    
                @Override
                public boolean nextKeyValue() throws IOException,
                        InterruptedException {
                    if(i >= values.length){
                        return false ;
                    }
                    if(null == this.value){
                        value = new Text();
                    }
                    value.set((Text)values[i]);
                    i++ ;
                    return true;
                }
    
                @Override
                public NullWritable getCurrentKey() throws IOException,
                        InterruptedException {
                    return NullWritable.get();
                }
    
                @Override
                public Text getCurrentValue() throws IOException,
                        InterruptedException {
                    return value;
                }
    
                @Override
                public float getProgress() throws IOException, InterruptedException {
                    // TODO Auto-generated method stub
                    return 0;
                }
    
                /**
                 * 程序结束的时候,关闭
                 */
                @Override
                public void close() throws IOException {
                }
                
            }
            
        }
    自定义实现InputFormat

    二:自定义实现OutputFormat

    常见的输出类型:TextInputFormat:默认输出格式,key和value中间用tab隔开.
            DBOutputFormat:写出到数据库的.
            SequenceFileFormat:将key,value以Sequence格式输出的.
            SequenceFileAsOutputFormat:SequenceFile以原始二进制的格式输出.
            MapFileOutputFormat:将key和value写入MapFile中.由于MapFile中key是有序的,所以写入的时候必须保证记录是按key值顺序入的.
            MultipleOutputFormat:多文件的一个输出.默认情况下一个reducer产生一个输出,但是有些时候我们想一个reducer产生多个输出,MultipleOutputFormat和MultipleOutputs就可以实现这个功能.
              MultipleOutputFormat:可以自定义输出文件的名称.
              继承MultipleOutputFormat 需要实现
                getBaseRecordWriter():
                generateFileNameForKeyvalue():根据键值确定文件名.

    /**
     *自定义输出OutputFormat:用于处理各种输出目的地的.
     *1.OutputFormat需要写出的键值对是来自于Reducer类.是通过RecordWriter获得的.
     *2.RecordWriter(..)中write只有key和value,写到那里去哪?这要通过单独传入输出流来处理.write方法就是把k,v写入到outputStream中的.
     *3.RecordWriter类是位于OutputFormat中的.因此,我们自定义OutputFormat必须继承OutputFormat类.那么流对象就必须在getRecordWriter(..)中获得.
     */
    public class MySelfOutputFormatApp {
        private static final String INPUT_PATH = "hdfs://hadoop1:9000/abd/hello";// 输入路径
        private static final String OUT_PATH = "hdfs://hadoop1:9000/out";// 输出路径,reduce作业输出的结果是一个目录
        private static final String OUT_FIE_NAME = "/abc";
        public static void main(String[] args) {
            Configuration conf = new Configuration();
            try {
                FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
                fileSystem.delete(new Path(OUT_PATH), true);
                Job job = new Job(conf, WordCountApp.class.getSimpleName());
                job.setJarByClass(WordCountApp.class);
                FileInputFormat.setInputPaths(job, INPUT_PATH);
                job.setMapperClass(MyMapper.class);
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(LongWritable.class);
                job.setReducerClass(MyReducer.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(LongWritable.class);
                job.setOutputFormatClass(MySelfTextOutputFormat.class);
                job.waitForCompletion(true);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static class MyMapper extends
                Mapper<LongWritable, Text, Text, LongWritable> {
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] splited = line.split("	");
                for (String word : splited) {
                    context.write(new Text(word), new LongWritable(1));// 把每个单词出现的次数1写出去.
                }
            }
        }
    
        public static class MyReducer extends
                Reducer<Text, LongWritable, Text, LongWritable> {
            @Override
            protected void reduce(Text key, Iterable<LongWritable> values,
                    Context context) throws IOException, InterruptedException {
                long count = 0L;
                for (LongWritable times : values) {
                    count += times.get();
                }
                context.write(key, new LongWritable(count));
            }
        }
        /**
         *自定义输出类型
         */
        public static class MySelfTextOutputFormat  extends OutputFormat<Text,LongWritable>{
            FSDataOutputStream outputStream = null ;
            @Override
            public RecordWriter<Text, LongWritable> getRecordWriter(
                    TaskAttemptContext context) throws IOException,
                    InterruptedException {
                try {
                    FileSystem fileSystem = FileSystem.get(new URI(MySelfOutputFormatApp.OUT_PATH), context.getConfiguration());
                    //指定的是输出文件的路径
                    String opath = MySelfOutputFormatApp.OUT_PATH+OUT_FIE_NAME;
                    outputStream = fileSystem.create(new Path(opath));
                } catch (URISyntaxException e) {
                    e.printStackTrace();
                }
                return new MySelfRecordWriter(outputStream);
            }
    
            @Override
            public void checkOutputSpecs(JobContext context) throws IOException,
                    InterruptedException {
            }
    
            /**
             * OutputCommitter:在作业初始化的时候创建一些临时的输出目录,作业的输出目录,管理作业和任务的临时文件的.
             * 作业运行过程中,会产生很多的Task,Task在处理的时候也会产生很多的输出.也会创建这个输出目录.
             * 当我们的Task或者是作业都运行完成之后,输出目录由OutputCommitter给删了.所以程序在运行结束之后,我们根本看不见任何额外的输出.
             * 在程序运行中会产生很多的临时文件,临时文件全交给OutputCommitter处理,真正的输出是RecordWriter(..),我们只需要关注最后的输出就可以了.中间的临时文件就是程序运行时产生的.
             */
            @Override
            public OutputCommitter getOutputCommitter(TaskAttemptContext context)
                    throws IOException, InterruptedException {
                //提交任务的输出,包括初始化路径,包括在作业完成的时候清理作业,删除临时目录,包括作业和任务的临时目录.
                //作业的输出路径应该是一个路径
                return new FileOutputCommitter(new Path(MySelfOutputFormatApp.OUT_PATH), context);
            }
        }
        public static class MySelfRecordWriter extends RecordWriter<Text, LongWritable>{
            FSDataOutputStream outputStream = null ;
            public MySelfRecordWriter(FSDataOutputStream outputStream) {
                this.outputStream = outputStream ;
            }
            @Override
            public void write(Text key, LongWritable value) throws IOException,
                    InterruptedException {
                this.outputStream.writeBytes(key.toString());
                this.outputStream.writeBytes("	");
                this.outputStream.writeLong(value.get());
            }
            @Override
            public void close(TaskAttemptContext context) throws IOException,
                    InterruptedException {
                this.outputStream.close();
            }
        }
    }
    自定义输出OutputFormat

     三:输出到多个文件目录中去

    /**
     *输出到多个文件目录中去
     *使用旧api
     */
    public class MyMultipleOutputFormatApp {
        private static final String INPUT_PATH = "hdfs://hadoop1:9000/abd";// 输入路径
        private static final String OUT_PATH = "hdfs://hadoop1:9000/out";// 输出路径,reduce作业输出的结果是一个目录
        public static void main(String[] args) {
            Configuration conf = new Configuration();
            try {
                FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
                fileSystem.delete(new Path(OUT_PATH), true);
                JobConf job = new JobConf(conf, WordCountApp.class);
                job.setJarByClass(WordCountApp.class);
                FileInputFormat.setInputPaths(job, INPUT_PATH);
                job.setMapperClass(MyMapper.class);
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(LongWritable.class);
                job.setReducerClass(MyReducer.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(LongWritable.class);
                job.setOutputFormat(MyMutipleFilesTextOutputFormat.class);
                FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
                JobClient.runJob(job);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> {
    
            @Override
            public void map(LongWritable key, Text value,
                    OutputCollector<Text, LongWritable> output, Reporter reporter)
                    throws IOException {
                String line = value.toString();
                String[] splited = line.split("	");
                for (String word : splited) {
                    output.collect(new Text(word), new LongWritable(1));
                }
            }
        }
    
        public static class MyReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable> {
            @Override
            public void reduce(Text key, Iterator<LongWritable> values,
                    OutputCollector<Text, LongWritable> output, Reporter reporter)
                    throws IOException {
                long count = 0L ;
                while(values.hasNext()){
                    LongWritable times = values.next();
                    count += times.get();
                }
                output.collect(key, new LongWritable(count));
            }
        }
        public static class MyMutipleFilesTextOutputFormat  extends MultipleOutputFormat<Text,LongWritable>{
    
            @Override
            protected org.apache.hadoop.mapred.RecordWriter<Text, LongWritable> getBaseRecordWriter(
                    FileSystem fs, JobConf job, String name, Progressable progress)
                    throws IOException {
                TextOutputFormat<Text, LongWritable> textOutputFormat = new TextOutputFormat<Text,LongWritable>();
                return textOutputFormat.getRecordWriter(fs, job, name, progress);
            }
    
            @Override
            protected String generateFileNameForKeyValue(Text key,
                    LongWritable value, String name) {
                String keyString = key.toString();
                if(keyString.startsWith("hello")){
                    return "hello";
                }else{
                    //输出的文件名就是k3的值    
                    return keyString ;
                }
            }
            
        }
    }
    输出到多个文件目录中去

    四:hadoop1.x api写单词计数的例子

    /**
     *hadoop1.x
     *使用旧api写单词计数的例子
     */
    public class WordCountApp {
        private static final String INPUT_PATH = "hdfs://hadoop1:9000/abd/hello";
        private static final String OUT_PATH = "hdfs://hadoop1:9000/out";
        public static void main(String[] args) {
            try {
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(conf);
                fs.delete(new Path(OUT_PATH),true);
                JobConf job = new JobConf(conf, WordCountApp.class);
                job.setJarByClass(WordCountApp.class);
                
                FileInputFormat.setInputPaths(job, INPUT_PATH);
                job.setMapperClass(MyMapper.class);
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(LongWritable.class);
                
                job.setReducerClass(MyReducer.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(LongWritable.class);
                FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
                JobClient.runJob(job);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        public static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable>{
    
            @Override
            public void map(LongWritable key, Text value,
                    OutputCollector<Text, LongWritable> output, Reporter reporter)
                    throws IOException {
                String line = value.toString();
                String[] splited = line.split("	");
                for (String word : splited) {
                    output.collect(new Text(word), new LongWritable(1L));
                }
            }
        }
        
        public static class MyReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable>{
    
            @Override
            public void reduce(Text key, Iterator<LongWritable> values,
                    OutputCollector<Text, LongWritable> output, Reporter reporter)
                    throws IOException {
                long times = 0L ;
                while (values.hasNext()) {
                    LongWritable longWritable = (LongWritable) values.next();
                    times += longWritable.get();
                }
                output.collect(key, new LongWritable(times));
            }
            
        }
        
    }
    使用旧api写单词计数的例子

    五:运行时接收命令行参数

    /**
     *运行时会接收一些命令行的参数
     *Tool接口:支持命令行的参数
     *命令行执行:
     *    hadoop jar jar.jar cmd.WordCountApp hdfs://hadoop1:9000/abd/hello hdfs://hadoop1:9000/out
     */
    public class WordCountApp extends Configured implements Tool {
        private static String INPUT_PATH = null;// 输入路径
        private static String OUT_PATH = null;// 输出路径,reduce作业输出的结果是一个目录
        @Override
        public int run(String[] args) throws Exception {
            INPUT_PATH = args[0];
            OUT_PATH = args[1];
            Configuration conf = getConf();// 配置对象
            try {
                FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
                fileSystem.delete(new Path(OUT_PATH), true);
                Job job = new Job(conf, WordCountApp.class.getSimpleName());// jobName:作业名称
                job.setJarByClass(WordCountApp.class);
                FileInputFormat.setInputPaths(job, INPUT_PATH);// 指定数据的输入
                job.setMapperClass(MyMapper.class);// 指定自定义map类
                job.setMapOutputKeyClass(Text.class);// 指定map输出key的类型
                job.setMapOutputValueClass(LongWritable.class);// 指定map输出value的类型
                job.setReducerClass(MyReducer.class);// 指定自定义Reduce类
                job.setOutputKeyClass(Text.class);// 设置Reduce输出key的类型
                job.setOutputValueClass(LongWritable.class);// 设置Reduce输出的value类型
                FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));// Reduce输出完之后,就会产生一个最终的输出,指定最终输出的位置
                job.waitForCompletion(true);// 提交给jobTracker并等待结束
            } catch (Exception e) {
                e.printStackTrace();
            }
            return 0;
        }
        public static void main(String[] args) {
            try {
                ToolRunner.run(new Configuration(), new WordCountApp(),args);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        public static class MyMapper extends
                Mapper<LongWritable, Text, Text, LongWritable> {
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] splited = line.split("	");
                for (String word : splited) {
                    context.write(new Text(word), new LongWritable(1));// 把每个单词出现的次数1写出去.
                }
            }
        }
    
        public static class MyReducer extends
                Reducer<Text, LongWritable, Text, LongWritable> {
            @Override
            protected void reduce(Text key, Iterable<LongWritable> values,
                    Context context) throws IOException, InterruptedException {
                long count = 0L;
                for (LongWritable times : values) {
                    count += times.get();
                }
                context.write(key, new LongWritable(count));
            }
        }
    }
    运行时接收命令行参数
  • 相关阅读:
    cmd设置代理
    移动端坐标定位tap
    T02-Django基本应用结构
    支持向量机算法的Sklearn完整复现
    T01-何为Django,入门"Hello World"
    决策树算法的Sklearn完整复现
    逻辑回归算法的Sklearn完整复现
    线性回归算法Sklearn完整复现
    K-近邻算法的Sklearn完整复现
    数据分析中的'疑难杂症'小结(三)
  • 原文地址:https://www.cnblogs.com/xiaolong1032/p/4529534.html
Copyright © 2011-2022 走看看