zoukankan      html  css  js  c++  java
  • hadoop知识整理(3)之MapReduce之代码编写

    前面2篇文章知道了HDFS的存储原理,知道了上传和下载文件的过程,同样也知晓了MR任务的执行过程,以及部分代码也已经看到,那么下一步就是程序员最关注的关于MR的业务代码(这里不说太简单的):

    一、关于MapTask的排序

      mapTask正常情况,按照key的hashcode进行从小到大的排序操作,形成map输出,交给reduce,(据某篇博文说,hashcode排序使用的是快排,这个无从考证),这里说明一下如何使用POJO类作为key,使其进行排序。

      1)POJO类实现WritableComparable<T>接口;

      2)重写compareTo(T t)方法,在此方法中返回为int,使用当前对象的排序对象,减去传入对象的排序字段,便是倒序排序(按照想要的方式)。示例为按照电影热度值倒序排序。

        @Override
        public int compareTo(Movie o) {
            return this.hot-o.hot;
        }

    二、关于地址复用

      1)注意,在reduce中,关于reduce方法中的values的迭代器,一旦遍历过后,迭代器中值将不再存在;

      2)这里是因为reduceTask在反射调用reduce方法时,为节省内存空间,使用了地址复用技术;

      3)所以如果想让对象保存下来,那么必须将对象完全克隆,这里建议,在使用POJO时候,最好实现clone方法,以便方便保存迭代器中的对象;示例代码是重写clone方法,以方便克隆;

    public Item clone(){
            Item o=null;
            try {
                o = (Item) super.clone();
            } catch (CloneNotSupportedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return o;
        }

    三、关于多文件join

      多文件join操作,map任务也是支持的,只需将多文件放在指定的hdfs输入目录即可,那么在map方法中,只需要将关联字段提为key,其他封装入对象,然后reduce中你想要的结果就是join后的,至于保留哪些字段,去掉冗余字段,那就全凭自己操作了。

    四、关于combiner

      根据前文,combine操作,即使将每一个maptask的输出结果,进行合并排序操作,如果程序员使用MR的人,指定了conbine操作的存在,那么maptask会根据spill内存缓冲溢出文件的数量进行判断是否确实需要combine操作,因为combine操作也会浪费资源,默认值中,假如spill文件的数量小于3,那么便不会进行combine操作,否则先进行combine操作,combine操作只针对于每一个maptask小任务,然后根据shuffle的原理,这些combine后的输出文件会被reduce的复制进行拿走。combine的启用方式:

    job.setCombinerClass(InventReducer.class);

    此段代码运行在MRDriver中,指定一个combiner的reduce类,和reduce的思路以及代码方式都一样:

    public class InventReducer extends Reducer<Text, Text, Text, Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            int count=0;
            for(Text t:values){
                count++;
            }
            String[] sss=key.toString().split(" ");
            context.write(new Text(sss[0]), new Text(sss[1]+" "+count));
        }
    }

    五、关于分区

      指定分区类,分区操作是在maptask中进行,当maptask输出文件结束后,maptask会根据spill文件进行排序和分区操作。

      Driver中指定分区类:

    job.setPartitionerClass(AuthPartitioner.class);

      分区类代码:

      分区类中,key为maptask的输出key,int numPartitions为分区编号,有几个分区编号,将会分几个区。

    public class AuthPartitioner extends Partitioner<IntWritable, IntWritable>{
        @Override
        public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
            String num=String.valueOf(key.get());
            if(num.matches("[0-9][0-9]")||num.matches("[0-9]")){
                return 0;
            }
            if(num.matches("[0-9][0-9][0-9]")){
                return 1;
            }else{
                return 2;
            }
            
        }
    }

      分区的目的在于指定的多个reduceTask可以分别处理自己分区的数据,以便让数据均匀地落盘。

    六、关于Job链

      这就是在Java代码中让多个Job串起来,实现很简单:代码写在Driver中

    //判断上一个Job的完成情况
    if (job.waitForCompletion(true)){
        //执行第二个Job代码          
    }

    七、自定义格式输入(Map的输入)

      默认map方法的输入为:间隔字符数量long型、本行数据text类型;

      假如想改变这些输入,假如将第一个key输入变为intWritable

      需要一个Format类和Reader类:

    public class AuthFormat extends FileInputFormat<IntWritable, Text>{
        @Override
        public RecordReader<IntWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException {
            return new AuthReader();
        }
    }
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.util.LineReader;
    public class AuthReader extends RecordReader<IntWritable, Text>{
        private FileSplit fs ;
        private LineReader lineReader ;
        private IntWritable key ;
        private Text value ;
        //--定义一个计数器,记录本次读取到了多少行
        int count = 0;
        
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            this.fs = (FileSplit) split;
            //--获取文件路径
            Path path = fs.getPath();
            //--获取文件系统
            Configuration conf = context.getConfiguration();
            FileSystem fileSystem = path.getFileSystem(conf);
            //--通过文件系统读取文件得到流
            FSDataInputStream in = fileSystem.open(path);
            //--将流包装为LineReader方便按行读取
            lineReader = new LineReader(in);
        }
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            //--要返回的键,本次读取的行数
            key = new IntWritable();
            //--要返回的值,本次读取到的内容
            value = new Text();
            //--定义一个temp临时记录内容
            Text temp = new Text();
            int len = lineReader.readLine(temp);
            //--判断是否读取到了数据
            if(len == 0){
                //--表示没有行数据可读,则不再执行 nextKeyValue()方法
                return false;
    
            }else{
                //--读到了数据,将数据追加到value中
                //可以这样写:value=tmp; 
                //也可以像下面这样写
                value.append(temp.getBytes(), 0, temp.getLength());
                //--计数器加1,表明读取到了一行内容
                count++;
                key.set(count);
                return true;    
                }    
        }
        @Override
        public IntWritable getCurrentKey() throws IOException, InterruptedException {
            return key;
        }
        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return value;
        }
        @Override
        public float getProgress() throws IOException, InterruptedException {
            return 0;
        }
        @Override
        public void close() throws IOException {
            if(lineReader != null)lineReader.close();
        }
    
    }

    在上文的源码解析中,知道了Reader的作用在于一行一行地读取源文件给maptask任务。

    这里相当于子类重写了父类的方法,在调用时,会直接调用子类的方法。

    而在Driver中需要增加:

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setInputFormatClass(AuthFormat.class);

    八、自定义格式输出

      自定义格式输出的目的在于规范输出格式重量,简单来说就是可以输出想要的任意输出格式:

    public class AuthOutputFormat<K,V> extends FileOutputFormat<K,V>{
    
        @Override
        public RecordWriter<K,V> gtetRecordWrier(TaskAttemptContext job) throws IOException, InterruptedException {
            
            //Get the default path and filename for the output format.
            //第二个参数:extension an extension to add to the filename
            Path path=super.getDefaultWorkFile(job, "");
            Configuration conf=job.getConfiguration();
            FileSystem fs=path.getFileSystem(conf);
            FSDataOutputStream out=fs.create(path);
    
            
            return new AuthWriter<K,V>(out,"|","
    ");
        }
    
    }
    public class AuthWriter<K,V> extends RecordWriter<K,V>{
        
        private FSDataOutputStream out;
        private String keyValueSeparator;
        private String lineSeparator;
        
    
        public AuthWriter(FSDataOutputStream out, String keyValueSeparator, String lineSeparator) {
            this.out=out;
            this.keyValueSeparator=keyValueSeparator;
            this.lineSeparator=lineSeparator;
        }
    
        @Override
        public void write(K key, V value) throws IOException, InterruptedException {
            out.write(key.toString().getBytes());
            out.write(keyValueSeparator.getBytes());
            out.write(value.toString().getBytes());
            out.write(lineSeparator.getBytes());
            
        }
    
        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            if(out!=null)out.close();
            
        }
    
    }

    同样一个format和一个writer,然后在Driver中指定即可

    //有了这句话,不用再写原来的输出语句
    job.setOutputFormatClass(AuthOutputFormat.class);

    九、关于多输入源

      在Driver中指定:

    public class ScoreDriver {
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "JobName");
            job.setJarByClass(cn.gjm.hadoop.ScoreDriver.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            
            job.setInputFormatClass(AuthInputFormat.class);
            //需要注意,如果一个Mapper代码不能通用的解决,则需要分别指定。此时,就不能去设置
            //setMapperClass()了
            MultipleInputs.addInputPath(job, new Path("hdfs://192.168.234.21:9000/formatscore/format-score.txt"),AuthInputFormat.class,ScoreMapper.class);
            
            MultipleInputs.addInputPath(job, new Path("hdfs://192.168.234.21:9000/formatscore/format-score-1.txt"),TextInputFormat.class,ScoreMapper2.class);
    
        
            FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.234.21:9000/formatscore/result"));
    
            if (!job.waitForCompletion(true))
                return;
        }

    十、多输出源

      

    public class ScoreReducer extends Reducer<Text, Text, Text, Text>{
        
        private MultipleOutputs<Text, Text> mos;
        
        @Override
        protected void reduce(Text name, Iterable<Text> scores, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            
            for(Text score:scores){
                if(name.toString().equals("jary")){
                    mos.write("jary",name,score);
                }
                if(name.toString().equals("rose")){
                    mos.write("rose",name,score);
                }
                if(name.toString().equals("tom")){
                    mos.write("tom", name,score);
                }
            }
            
        }
        @Override
        protected void setup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            mos=new MultipleOutputs<>(context);
        }
    
    }
    ScoreDriver代码:
    public class ScoreDriver {
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "JobName");
            job.setJarByClass(cn.tarena.hadoop.ScoreDriver.class);
    
    
            // TODO: specify a reducer
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setReducerClass(ScoreReducer.class);
            
            job.setInputFormatClass(AuthInputFormat.class);
            
            MultipleInputs.addInputPath(job, new Path("hdfs://192.168.234.21:9000/formatscore/format-score.txt"),AuthInputFormat.class,ScoreMapper.class);
            MultipleInputs.addInputPath(job, new Path("hdfs://192.168.234.21:9000/formatscore/format-score-1.txt"),TextInputFormat.class,ScoreMapper2.class);
            
            
            MultipleOutputs.addNamedOutput(job, "jary", AuthOutputFormat.class, Text.class, Text.class);
            MultipleOutputs.addNamedOutput(job, "tom", AuthOutputFormat.class, Text.class, Text.class);
            MultipleOutputs.addNamedOutput(job, "rose", AuthOutputFormat.class, Text.class, Text.class);
        
            FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.234.21:9000/formatscore/result"));
    
            if (!job.waitForCompletion(true))
                return;
        }
    
    }

    十一,多次排序

    //只需在compare方法中指定多字段排序即可    
    @Override
        public int compareTo(Profit o) {
            int result=this.month-o.month;
            if(result!=0){
                return result;
            }else{
                return o.profit-this.profit;
            }
        }
  • 相关阅读:
    英文、简繁体中文 IT 词汇对照表
    VB.NET 中的 As New 以及型別指定
    使用 ADO.NET 的 ExecuteScalar 方法返回单一值
    适时调整 SqlDataSource 控件的 DataSourceMode 属性
    ADO.NET 2.0 的并行控制与数据存取冲突侦测
    让 ADO.NET 2.0 的 SqlCommand 和 SqlDataAdapter 合作
    透过 Socket API 让 PDA 和远程 PC 联机
    探讨 .NET 语言的 using statement 与资源释放
    让 user control 中的 Button 也能启用验证
    dotNET 語言中可提升效能的邏輯運算子
  • 原文地址:https://www.cnblogs.com/qfxydtk/p/11173625.html
Copyright © 2011-2022 走看看