zoukankan      html  css  js  c++  java
  • Hadoop基础(十八):MapReduce框架原理(二)切片机制(二)

    1.5 CombineTextInputFormat案例实操

    1.需求

    将输入的大量小文件合并成一个切片统一处理。

    (1)输入数据

    准备4个小文件

    (2)期望

    期望一个切片处理4个文件

    2.实现过程

    (1)不做任何处理,运行1.6的WordCount案例程序,观察切片个数为4

    (2)在WordcountDriver增加如下代码运行程序,并观察运行的切片个数为3

    (a)驱动类中添加代码如下:

    // 如果不设置InputFormat,它默认用的是TextInputFormat.class

    job.setInputFormatClass(CombineTextInputFormat.class);

     

    //虚拟存储切片最大值设置4m

    CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

    (b)运行如果为3个切片。

    3在WordcountDriver增加如下代码运行程序,并观察运行的切片个数为1

    (a)驱动中添加代码如下:

    // 如果不设置InputFormat,它默认用的是TextInputFormat.class

    job.setInputFormatClass(CombineTextInputFormat.class);

     

    //虚拟存储切片最大值设置20m

    CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);

    (b)运行如果为1个切片。

    1.6 FileInputFormat实现

     

     

     

     

    1.7 KeyValueTextInputFormat使用案例

     

    1.需求

    统计输入文件中每一行的第一个单词相同的行数。

    1)输入数据

    banzhang ni hao
    xihuan hadoop banzhang
    banzhang ni hao
    xihuan hadoop banzhang

    2)期望结果数据

    banzhang    2
    xihuan    2

    2.需求分析

    3.代码实现

    1编写Mapper类

    package com.atguigu.mapreduce.KeyValueTextInputFormat;
    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class KVTextMapper extends Mapper<Text, Text, Text, LongWritable>{
        
    // 1 设置value
       LongWritable v = new LongWritable(1);  
        
        @Override
        protected void map(Text key, Text value, Context context)
                throws IOException, InterruptedException {
    
    // banzhang ni hao
            
            // 2 写出
            context.write(key, v);  
        }
    }

    2)编写Reducer

    package com.atguigu.mapreduce.KeyValueTextInputFormat;
    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class KVTextReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
        
        LongWritable v = new LongWritable();  
        
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values,    Context context) throws IOException, InterruptedException {
            
             long sum = 0L;  
    
             // 1 汇总统计
            for (LongWritable value : values) {  
                sum += value.get();  
            }
             
            v.set(sum);  
             
            // 2 输出
            context.write(key, v);  
        }
    }

    3)编写Driver

    package com.atguigu.mapreduce.keyvaleTextInputFormat;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
    import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class KVTextDriver {
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            
            Configuration conf = new Configuration();
            // 设置切割符
        conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
            // 1 获取job对象
            Job job = Job.getInstance(conf);
            
            // 2 设置jar包位置,关联mapper和reducer
            job.setJarByClass(KVTextDriver.class);
            job.setMapperClass(KVTextMapper.class);
    job.setReducerClass(KVTextReducer.class);
                    
            // 3 设置map输出kv类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
    
            // 4 设置最终输出kv类型
            job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
            
            // 5 设置输入输出数据路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            
            // 设置输入格式
        job.setInputFormatClass(KeyValueTextInputFormat.class);
            
            // 6 设置输出数据路径
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            // 7 提交job
            job.waitForCompletion(true);
        }
    }

    1.8 NLineInputFormat使用案例

    1需求

    对每个单词进行个数统计,要求根据每个输入文件的行数规定输出多少个切片。此案例要求行放入一个切片中。

    1)输入数据

    banzhang ni hao
    xihuan hadoop banzhang
    banzhang ni hao
    xihuan hadoop banzhang
    banzhang ni hao
    xihuan hadoop banzhang
    banzhang ni hao
    xihuan hadoop banzhang
    banzhang ni hao
    xihuan hadoop banzhang banzhang ni hao
    xihuan hadoop banzhang

    2)期望输出数据

    Number of splits:4

    2.需求分析

    3.代码实现

    1)编写Mapper

    package com.atguigu.mapreduce.nline;
    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class NLineMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
        
        private Text k = new Text();
        private LongWritable v = new LongWritable(1);
        
        @Override
        protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException {
            
             // 1 获取一行
            String line = value.toString();
            
            // 2 切割
            String[] splited = line.split(" ");
            
            // 3 循环写出
            for (int i = 0; i < splited.length; i++) {
                
                k.set(splited[i]);
                
               context.write(k, v);
            }
        }
    }

    2)编写Reducer

    package com.atguigu.mapreduce.nline;
    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class NLineReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
        
        LongWritable v = new LongWritable();
        
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values,    Context context) throws IOException, InterruptedException {
            
            long sum = 0l;
    
            // 1 汇总
            for (LongWritable value : values) {
                sum += value.get();
            }  
            
            v.set(sum);
            
            // 2 输出
            context.write(key, v);
        }
    }

    3)编写Driver

    package com.atguigu.mapreduce.nline;
    import java.io.IOException;
    import java.net.URISyntaxException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class NLineDriver {
        
        public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
            
    // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
    args = new String[] { "e:/input/inputword", "e:/output1" };
    
             // 1 获取job对象
             Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
            
            // 7设置每个切片InputSplit中划分三条记录
            NLineInputFormat.setNumLinesPerSplit(job, 3);
              
            // 8使用NLineInputFormat处理记录数  
            job.setInputFormatClass(NLineInputFormat.class);  
              
            // 2设置jar包位置,关联mapper和reducer
            job.setJarByClass(NLineDriver.class);  
            job.setMapperClass(NLineMapper.class);  
            job.setReducerClass(NLineReducer.class);  
            
            // 3设置map输出kv类型
            job.setMapOutputKeyClass(Text.class);  
            job.setMapOutputValueClass(LongWritable.class);  
            
            // 4设置最终输出kv类型
            job.setOutputKeyClass(Text.class);  
            job.setOutputValueClass(LongWritable.class);  
              
            // 5设置输入输出数据路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));  
            FileOutputFormat.setOutputPath(job, new Path(args[1]));  
              
            // 6提交job
            job.waitForCompletion(true);  
        }
    }

    4.测试

    1)输入数据

    banzhang ni hao
    xihuan hadoop banzhang
    banzhang ni hao
    xihuan hadoop banzhang
    banzhang ni hao
    xihuan hadoop banzhang
    banzhang ni hao
    xihuan hadoop banzhang
    banzhang ni hao
    xihuan hadoop banzhang banzhang ni hao
    xihuan hadoop banzhang

    2)输出结果的切片,如图4-10所示

    1.9 自定义InputFormat

    1.10 自定义InputFormat案例实操

    无论HDFS还是MapReduce,在处理小文件时效率非常,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。

    1需求

    多个小文件合并成一个SequenceFile文件SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key文件内容为value

    1)输入数据

      

    2)期望输出文件格式

    2.需求分析

    3.程序实现

    1)自定义InputFromat

    package com.atguigu.mapreduce.inputformat;
    import java.io.IOException;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    // 定义类继承FileInputFormat
    public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable>{
        
        @Override
        protected boolean isSplitable(JobContext context, Path filename) {
            return false;
        }
    
        @Override
        public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)    throws IOException, InterruptedException {
            
            WholeRecordReader recordReader = new WholeRecordReader();
            recordReader.initialize(split, context);
            
            return recordReader;
        }
    }

    2)自定义RecordReader

    package com.atguigu.mapreduce.inputformat;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class WholeRecordReader extends RecordReader<Text, BytesWritable>{
    
        private Configuration configuration;
        private FileSplit split;
        
        private boolean isProgress= true;
        private BytesWritable value = new BytesWritable();
        private Text k = new Text();
    
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            
            this.split = (FileSplit)split;
            configuration = context.getConfiguration();
        }
    
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            
            if (isProgress) {
    
                // 1 定义缓存区
                byte[] contents = new byte[(int)split.getLength()];
                
                FileSystem fs = null;
                FSDataInputStream fis = null;
                
                try {
                    // 2 获取文件系统
                    Path path = split.getPath();
                    fs = path.getFileSystem(configuration);
                    
                    // 3 读取数据
                    fis = fs.open(path);
                    
                    // 4 读取文件内容
                    IOUtils.readFully(fis, contents, 0, contents.length);
                    
                    // 5 输出文件内容
                    value.set(contents, 0, contents.length);
    
    // 6 获取文件路径及名称
    String name = split.getPath().toString();
    
    // 7 设置输出的key值
    k.set(name);
    
                } catch (Exception e) {
                    
                }finally {
                    IOUtils.closeStream(fis);
                }
                
                isProgress = false;
                
                return true;
            }
            
            return false;
        }
    
        @Override
        public Text getCurrentKey() throws IOException, InterruptedException {
            return k;
        }
    
        @Override
        public BytesWritable getCurrentValue() throws IOException, InterruptedException {
            return value;
        }
    
        @Override
        public float getProgress() throws IOException, InterruptedException {
            return 0;
        }
    
        @Override
        public void close() throws IOException {
        }
    }

    3)编写SequenceFileMapper类处理流程

    package com.atguigu.mapreduce.inputformat;
    import java.io.IOException;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{
        
        @Override
        protected void map(Text key, BytesWritable value,            Context context)        throws IOException, InterruptedException {
    
            context.write(key, value);
        }
    }

    4)编写SequenceFileReducer类处理流程

    package com.atguigu.mapreduce.inputformat;
    import java.io.IOException;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
    
        @Override
        protected void reduce(Text key, Iterable<BytesWritable> values, Context context)        throws IOException, InterruptedException {
    
            context.write(key, values.iterator().next());
        }
    }

    5)编写SequenceFileDriver类处理流程

    package com.atguigu.mapreduce.inputformat;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    
    public class SequenceFileDriver {
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            
           // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
            args = new String[] { "e:/input/inputinputformat", "e:/output1" };
    
           // 1 获取job对象
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
           // 2 设置jar包存储位置、关联自定义的mapper和reducer
            job.setJarByClass(SequenceFileDriver.class);
            job.setMapperClass(SequenceFileMapper.class);
            job.setReducerClass(SequenceFileReducer.class);
    
           // 7设置输入的inputFormat
            job.setInputFormatClass(WholeFileInputformat.class);
    
           // 8设置输出的outputFormat
         job.setOutputFormatClass(SequenceFileOutputFormat.class);
           
    // 3 设置map输出端的kv类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(BytesWritable.class);
            
           // 4 设置最终输出端的kv类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(BytesWritable.class);
    
           // 5 设置输入输出路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
           // 6 提交job
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    }
  • 相关阅读:
    go多平台编译打包
    正则表达式
    Java 线程安全问题的本质
    Thread interrupt() 线程中断的详细说明
    git -c diff.mnemonicprefix=false -c core.quotepath=false --no-optional-locks push -v --tags origin master:master Logon failed, use ctrl+c to cancel basic credential prompt.
    kafka 消费组功能验证以及消费者数据重复数据丢失问题说明 3
    kafka 副本机制和容错处理 -2
    Mat使用详解
    MySql Binlog 说明 & Canal 集成MySql的更新异常说明 & MySql Binlog 常用命令汇总
    ElasticSearch 集群基本概念及常用操作汇总(建议收藏)
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13326492.html
Copyright © 2011-2022 走看看