zoukankan      html  css  js  c++  java
  • 自定义数据类型写入SequenceFile并读出

        开头对这边博客的内容做个概述,首先是定义了一个DoubleArrayWritable的类,用于存放矩阵的列向量,然后将其作为value写入SequenceFile中,key就是对应的矩阵的列号,最后(key,value)从SequenceFile中读出,与另一矩阵做乘法。完全通过IDEA在本地调试程序,并未提交集群。一般来说是将hadoop-core-1.2.1.jar和lib目录下的commons-cli-1.2.jar两个包加入到工程的classpath中就可以了,不过仅仅添加这两个包,调试的时候会提示找不到某些类的定义,所以索性将hadoop-core-1.2.1.jar和lib目录下的所有jar包均添加到工程的classpath中,这样完全不必提交到集群就可以在本地调试程序。

    1)首先是定义DoubleArrayWritable类,这个类继承与ArrayWritable。

    1 import org.apache.hadoop.io.IntWritable;
    2 import org.apache.hadoop.io.ArrayWritable;
    3 public class IntArrayWritable extends ArrayWritable {
    4     public IntArrayWritable(){
    5         super(IntWritable.class);
    6     }
    7 }

    因为要读取SequenceFile中的(key,value)传给map,所以需要以4-6的形式显示定义构造函数。

    2)然后是将DoubleArrayWritable类型的对象作为value写入SequenceFile,使用SequenceFile.writer

    /**
     * Created with IntelliJ IDEA.
     * User: hadoop
     * Date: 16-3-4
     * Time: 上午10:36
     * To change this template use File | Settings | File Templates.
     */
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import java.io.IOException;
    import java.net.URI;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.ArrayWritable;
    public class SequenceFileWriterDemo {
        public static void main(String[] args) throws IOException {
            String uri="/home/hadoop/2016Test/SeqTest/10IntArray";
            Configuration conf=new Configuration();
            FileSystem fs=FileSystem.get(URI.create(uri),conf);
            Path path=new Path(uri);
            IntWritable key=new IntWritable();
            IntArrayWritable value=new IntArrayWritable();//定义IntArrayWritable类型的alue值。
            value.set(new IntWritable[]{new IntWritable(1),new IntWritable(2),new IntWritable(3),
                    new IntWritable(4)});
            SequenceFile.Writer writer=null;
            writer=SequenceFile.createWriter(fs,conf,path,key.getClass(),value.getClass());
            int i=0;
            while(i<10){
                   key.set(i++);
                   //value.set(intArray);
                   writer.append(key,value);
                }
            writer.close();//一定要加上这句,否则写入SequenceFile会失败,结果是一个空文件。
            System.out.println("done!");
        }
    }
     class IntArrayWritable extends ArrayWritable {
        public IntArrayWritable(){
            super(IntWritable.class);
        }
    }

    这就完成了一个10行4列的矩阵写入SequenceFile文件在,其中key是矩阵行号,value是IntArrayWritable类型的变量。

    3)将生成的SequenceFile上传到集群,然后查看其内容,使用命令(需要将IntArrayWritable类打包并将其路径加入到hadoop_env.sh中HADOOP_CLASSPATH中)如下:

    hadoop fs -text /testData/10IntArray

    结果如下:

    好像哪里不对?应该是[1,2,3,4]数组呀。其实是对的,写入SequenceFile中时就是将”活对象“持久化存储的过程,也就是序列化,所以当我们以文本的方式(-text)打开文件时,就看到了IntArrayWritable...的形式。如果想要看数组也可以,反序列化就好了。

    4)使用SequenceFile.reader读取上述SequenceFile文件的内容,我要看到数组~~~,代码如下:

    /**
     * Created with IntelliJ IDEA.
     * User: hadoop
     * Date: 16-3-4
     * Time: 下午5:41
     * To change this template use File | Settings | File Templates.
     */
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.SequenceFile;
    //import org.apache.hadoop.mapreduce.lib.input.*;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.util.ReflectionUtils;
    
    import java.io.IOException;
    import java.net.URI;
    
    public class SequencefileReaderDemo {
        public static void main(String[] args) throws IOException {
            String uri="/home/hadoop/2016Test/SeqTest/10IntArray";
            Configuration conf=new Configuration();
            FileSystem fs =FileSystem.get(URI.create(uri),conf);
            Path path=new Path(uri);
            SequenceFile.Reader reader=null;
            try {
                reader=new SequenceFile.Reader(fs,path,conf);
                Writable key =(Writable)ReflectionUtils.newInstance(reader.getKeyClass(),conf);
                IntArrayWritable value=(IntArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf);
                long position=reader.getPosition();
                String[] sValue=null;
                while(reader.next(key,value)){
                    String syncSeen=reader.syncSeen()?"*":"";
                    sValue=value.toStrings();
                    System.out.printf("[%s%s]	%s	%s	",position,syncSeen,key,value);
                    for (String s:sValue){
                        System.out.printf("%s	", s);
                    }
                    System.out.println();
                    position=reader.getPosition();
                }
            }
            finally {
                    IOUtils.closeStream(reader);
            }
        }
    
    }

    运行结果如下:

    5)最后,利用上述生成的SequenceFile文件作为左矩阵,写一个MR程序计算矩阵的乘法,代码如下:

     1 /**
     2  * Created with IntelliJ IDEA.
     3  * User: hadoop
     4  * Date: 16-3-4
     5  * Time: 上午10:34
     6  * To change this template use File | Settings | File Templates.
     7  */
     8 import org.apache.hadoop.conf.Configuration;
     9 import org.apache.hadoop.fs.FileSystem;
    10 import org.apache.hadoop.fs.Path;
    11 import org.apache.hadoop.mapreduce.Job;
    12 import org.apache.hadoop.mapreduce.Mapper;
    13 import org.apache.hadoop.mapreduce.Reducer;
    14 import org.apache.hadoop.io.IntWritable;
    15 import org.apache.hadoop.mapreduce.lib.input.*;
    16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    17 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    18 
    19 import java.io.IOException;
    20 import java.lang.reflect.Array;
    21 import java.net.URI;
    22 
    23 
    24 public class MRTest {
    25     public static class MyMapper extends Mapper<IntWritable,IntArrayWritable,IntWritable,IntArrayWritable>{
    26         public static  int[][] rightMatrix=new int[][]{{10,10,10,10,10},{10,10,10,10,10},{10,10,10,10,10},{10,10,10,10,10}};
    27         public IntWritable key=new IntWritable();
    28         public IntArrayWritable value=new IntArrayWritable();
    29         //public IntWritable[] valueInput=null;
    30         public Object valueObject=null;
    31         public IntWritable[] arraySum=new IntWritable[rightMatrix[0].length];
    32         public int sum=0;
    33         public void map(IntWritable key,IntArrayWritable value,Context context) throws IOException, InterruptedException {
    34             valueObject=value.toArray();//value.toArray的返回值是一个Object类型的对象,但是Object内部值是数组呀
    35                                         //使用Array.get(valueObject,3)可以得到数组中第4个元素,然后将其转化为string,再使用
    36                                         //Integer.parseInt(str)将其转化为整型值.
    37             for (int i=0;i<rightMatrix[0].length;++i){
    38                 sum=0;
    39                 for (int j=0;j<rightMatrix.length;++j){
    40                     sum+=(Integer.parseInt(((Array.get(valueObject,j)).toString())))*rightMatrix[j][i];
    41                 }
    42                 arraySum[i]=new IntWritable(sum);
    43             }
    44             value.set(arraySum);
    45             context.write(key,value);
    46         }
    47     }
    48     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    49         String uri="/home/hadoop/2016Test/SeqTest/10IntArray";
    50         String outUri="/home/hadoop/2016Test/SeqTest/output";
    51         Configuration conf=new Configuration();
    52         FileSystem fs=FileSystem.get(URI.create(uri), conf);
    53 
    54         fs.delete(new Path(outUri),true);//输出目录存在的话就将其删除。
    55 
    56         Job job=new Job(conf,"SeqMatrix");
    57         job.setJarByClass(MRTest.class);
    58         job.setMapperClass(MyMapper.class);
    59         job.setInputFormatClass(SequenceFileInputFormat.class);
    60         job.setOutputFormatClass(SequenceFileOutputFormat.class);
    61         job.setOutputKeyClass(IntWritable.class);
    62         job.setOutputValueClass(IntArrayWritable.class);
    63         FileInputFormat.setInputPaths(job,new Path(uri));
    64         FileOutputFormat.setOutputPath(job,new Path(outUri));
    65         System.exit(job.waitForCompletion(true)?0:1);
    66     }
    67 
    68 
    69 }

    其中,使用Array.get(object,index)从包含数组的Object对象内部获得数组值的方法参考了:http://www.blogjava.net/pengpenglin/archive/2008/09/04/226968.html

    最后的计算结果如下:

     
  • 相关阅读:
    Windows Server 2012 R2搭键域环境
    JS节点的属性: nodeType, nodeName, nodeValue
    JS获取文本节点
    JS获取元素节点的子节点
    读写属性节点
    获取指定的元素节点
    JAVA学习--反射之动态代理模式
    JAVA学习--反射之静态代理模式
    JAVA学习--反射其他操作
    JAVA学习--反射构造器操作
  • 原文地址:https://www.cnblogs.com/lz3018/p/5243503.html
Copyright © 2011-2022 走看看