zoukankan      html  css  js  c++  java
  • Hadoop学习笔记之三 数据流向

     http://hadoop.apache.org/docs/r1.2.1/api/index.html

    最基本的:

    1. 文本文件的解析

    2. 序列文件的解析

     toString会将Byte数组中的内存数据 按照字节间隔以字符的形式显示出来。

    文本文件多事利用已有的字符处理类, 序列文件多事创建byte数组,然后将文件流中的数据复制到byte数组后进行解析。

     LineRecordReader。。。 。。。

    这里首先需要了整个文件数据 的流动方向。

    MapReduce框架借助inputformat完成输入数据的规范检查,借助outputformat完成输出数据的规范性检查。

    context的常用用法:

    context.getConfiguration
    context.getInputSplit
    context.write

    利用好context可以随心所欲的输出,从输入key value list中获得信息,输出可以是单个的key,value;也可以是key list

    从输入中获得信息,用context随便向外写!

    一个Mapper对应一个Split文件,而recorderreder需要多次调用用来解析键值对:

    如下所示一个文本文件传入mapper,而map函数多次被触发

    public class TestDataFlow {
        public static void main(String[] args)throws Exception{
            Configuration conf  = new Configuration();
            Job job = new Job(conf, "testDataFlow");
            
            job.setJarByClass(TestDataFlow.class);
            job.setMapperClass(myMapper.class);
            
            FileInputFormat.addInputPath(job, new Path("hdfs://MASTERPC:9000/home/Fea.txt"));
            FileOutputFormat.setOutputPath(job, new Path("hdfs://MASTERPC:9000/out"));
            
            job.waitForCompletion(true);
            
        }
        
        public static class myMapper extends Mapper<Object, Text, Text, Text>{
            private FileSplit split; 
            public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException{
                split = (FileSplit)context.getInputSplit();
                System.out.println("输入文件块的路径:"+split.getPath().toString());
                
            }
        }
        
        
    }
    14/11/09 17:15:44 INFO mapred.MapTask: Processing split: hdfs://MASTERPC:9000/home/Fea.txt:0+7397
    
    输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
    输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
    输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
    输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
    输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
    输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
    输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
    输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt

    下面是一段自定义InputFormat的程序,功能是将零碎的小文件合并成大的sequence文件:key文件名,value文件值

    //主要包括两个部分:文件划分 + 创建RecordReader; 下面的代码new了一个自己的reader返回
    public
    class myFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> { protected boolean isSplitable(JobContext context, Path file){ return false; } public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,InterruptedException{ myRecordReader recorder = new myRecordReader(); recorder.initialize(split, context); return recorder; } }
    //使用split获得输入文件块的大小、路径信息;使用context获得fs真正的从dfs上读入文件内容到value成员变量中。
    //成员变量value用来传递给Mapper的map函数使用
    public class myRecordReader extends RecordReader<NullWritable, BytesWritable> { private FileSplit split; private Configuration conf; private BytesWritable value = new BytesWritable(); private boolean processed = false; public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.split = (FileSplit)split; this.conf = context.getConfiguration(); } public void close(){ ; } public NullWritable getCurrentKey()throws IOException{ return null; } public BytesWritable getCurrentValue()throws IOException{ return value; } public float getProgress()throws IOException, InterruptedException{ return processed?1.0f:0.0f; } public boolean nextKeyValue()throws IOException, InterruptedException{ if (!processed){ byte[] buf = new byte[(int)split.getLength()]; FileSystem fs = FileSystem.get(conf); Path path = split.getPath(); InputStream in = null; try{ in = fs.open(path); IOUtils.readFully(in, buf, 0, buf.length); value.set(buf,0, buf.length); }finally{ IOUtils.closeStream(in); } processed = true; return true; }else{ return false; } } }   //RecordReader处理好的key和value自动的传递给map函数
    public static class myMapper extends Mapper<Object, BytesWritable, Text, BytesWritable>{ private FileSplit split; private Text outkey = new Text(); public void setup(Context context){ split = (FileSplit)context.getInputSplit(); outkey.set(split.getPath().toString()); System.out.println("输入文件块的路径:"+split.getPath().toString()); } public void map(Object key, BytesWritable value, Context context) throws IOException, InterruptedException{ System.out.println("key: "+ outkey.toString()+" value: "+value.toString()); }

    Mapper类对应的是输入的文件块split,map对应的是文件块解析出来的一个个的<key, value>

    RecordReader对应的是输入的文件块split,可能需要多次对split进行解析。

    InputFormat中分为两部分,getSplit是将inputPath路径下的HDFS文件划分成块split,另一部分是CreateRecordReader

    创建用来解析每个split。

    其中划分成块是在终端上传数据文件时进行的,然后划分之后的文件信息提供给jobTracker进行分配,分配到任务的节点(Mapper)到相应的 位置下载自己的split文件

    然后调用RecordReader不断对这个split文件进行解析,将生成的<key,value>送给map函数进行处理生成新的<key,value>,经过排序合并分组之后传递给相应的reduce。

    自定义InputFormat

    下面是一个将小文件聚合成大的序列文件的mapred作业,核心是利用自定义的FileInputFormt将系统分配的小文件,以文件名(text)key,

    文件内容(bytes)value的键值对形式传递给最后的SeqenceFileOutputFormat,将这些键值对写入HDFS上的序列文件中。

    由于文件较小所以不会再分块,一个文件作为整体输入到节点上,FileInputFormat中设置文件不可分。

    Mapper类中调用FileInputFormat中的创建初始化RecordReader函数:createRecordReader(),

    RecordReader将输入文件解析后将<key,value>送给map函数, 在之前的TextInputFormat的LineRecordReader是不断的对split进行处理

    将每行解析成一个<key, value>送给map,但是这里的应用时RecordReader仅仅处理split文件一次,就将整个文件的内容作为<null, value>

    传递给map, 最后的reduce在将所有的键值对聚合成一个。所以RecordReader类中做了处理次数的判断处理。

    这样一来对于每个节点的map也只会执行一次,多以在Mapper一开始的时候setup函数中将输入的Split文件的文件名获得作为key。

    文件:myRecordReader.java

    [说明]RecordReader的作用就是为了解析split文件,所以createRecordReader时传入的参数是split、context,在RecordReader类中,首先需要获得

    HDFS上的split的输入文件流,然后对此进行解析,并记录下处理的位置,方便下次map调用的时候,从上次的位置接着对split文件进行解析。

    文件:myFileInputFormat.java

    [说明]该类中设置文件不可分,创建RecordReader对象。

    inputFormat和 recordReader中的键值对类型一定要和  map中的键值对类型一致。

    作业上下文context可以获得作业参数conf、输入文件InputSplit、

    在Mapper类的初始化函数中传入的参数就是context

    public void setup(Context context)

    OutputFormat:

    主要的函数是:

    getRcordWriter, 返回的是RecordWriter, 它负责将键值对写入到存储部件中。

    checkOutputSpecs,负责检查输出目录参数是否合理,如果输出目录已经存在就报错。

    getOutputCommiter,OutputCommiter负责临时文件的初始化,作业完成之后清理临时目录等等。

    系统默认的是TextOutputFormat,它使用LineRecordWriter将最终获的键值对以key + + value的方式逐行输出到文本文件中。

     在 编程时使用较多的是重载 writer、RecordWriter、getRecordWriter,

     tips:在eclipse中代码的空白处右键->Source->Override/Implements Methods 然后在弹出的窗口中选择需要重载的成员方法。^.^

     (生成的重载函数上面会自动生@Override灰色字体)

     下面是hadoop源码SequenceFileOutputFormat中的一段:

      public RecordWriter<K, V> 
             getRecordWriter(TaskAttemptContext context
                             ) throws IOException, InterruptedException {
       。。。 。。。
       final SequenceFile.Writer out =    SequenceFile.createWriter(fs, conf, file, context.getOutputKeyClass(), context.getOutputValueClass(), compressionType, codec, context); return new RecordWriter<K, V>() { public void write(K key, V value) throws IOException { out.append(key, value); } public void close(TaskAttemptContext context) throws IOException { out.close(); } };

     可见write中传入的是OutputFormat<K,V>中的键值对,而创建的SequenceFile.Writer是根据setOutputKeyClass、setOutputValueClass设置的类型进行写文件。

    public class TestFormat {public  static void main(String[] args)throws Exception{
            Configuration conf = new Configuration();
            Job job = new Job(conf, "testFormat!");
            job.setInputFormatClass(SequenceFileInputFormat.class);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
            FileInputFormat.addInputPath(job , new Path("hdfs://master:9000/hadoop"));
            FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/out"));
            
            job.waitForCompletion(true);
            
        }
    }

    java.lang.Exception: java.io.IOException:

    Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, recieved org.apache.hadoop.io.Text

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(BytesWritable.class);

    RecordReader是根据输入文件中的内容自动获得key和value的class,所以map之后的k,v类型是原始文件的kv类型,但是write函数中out(writer)的kv类型时job中设置的。


     //草稿:

     public static class myOutputFormat<Text, BytesWritable> extends  SequenceFileOutputFormat<Text, BytesWritable>{
            public RecordWriter<Text, BytesWritable> getRecordWriter(
                    TaskAttemptContext arg0) throws IOException,
                    InterruptedException {
                // TODO Auto-generated method stub
                return super.getRecordWriter(arg0);
            }
    
        }

    定义自己的数据:

    public class Point3D implements WritableComparable<Point3D>{
            private float x, y, z;
            public float getX(){return x;}
            public float getY(){return y;}
            public float getZ(){return z;}
            public void readFields(DataInput in) throws IOException{
                x = in.readFloat();
                y = in.readFloat();
                z = in.readFloat();
            }
            public void write(DataOutput out)throws IOException{
                out.writeFloat(x);
                out.writeFloat(y);
                out.writeFloat(z);
            }
            public int compareTo(Point3D p){
                float ret = (x*x+y*y +z*z) - (p.x*(p.x)+p.y*(p.y)+p.z*(p.z));
                if (ret > 0)
                    return 1;
                else if (ret == 0)
                    return 0;
                else 
                    return -1;
        
            }
        }

    一些常用的流:

    FSDataInputStream
    DataInputStream
    FileInputStream
    InputStream

    BufferedInputStream

    链式处理:

    在hadoop计算的过程中比较耗费时间的是IO操作,一些Job 在Map需要前期预处理,reduce后需要后处理,这样可以使用Job的链式处理;

    通过ChainMapper.addMapp实现,但是该类不支持hadoop1.2.1版本。

    ---------------------------------------------------------------

    参考文献:

    《实战Hadoop》开启通向云计算的捷径,刘鹏主编,电子工业出版社

    最短路径系列之一《从零开始学Hadoop》

    http://blog.csdn.net/chaoping315/article/details/6221440

  • 相关阅读:
    TP3.2框架,实现空模块、空控制器、空操作的页面404替换||同步实现apache报错404页面替换
    调用支付宝PHP接口API实现在线即时支付功能(UTF-8编码)
    JQuery实现的 checkbox 全选;<select>下拉框功能
    使用PHP做移动端 api接口开发方法(适用于TP框架)
    Eclipse jvm启动参数在哪设置
    对 META标签 的一点点了解
    Java反射在整个程序运行中的位置
    Java 为什么要使用反射(通俗易懂的举例)
    粗略介绍Java AQS的实现原理
    Java并发包中线程池的种类和特点介绍
  • 原文地址:https://www.cnblogs.com/sunniflyer/p/4085758.html
Copyright © 2011-2022 走看看