zoukankan      html  css  js  c++  java
  • [SequenceFile_2] SequenceFile 的基本操作


    0. 说明

      测试序列文件的读写操作 && 测试序列文件的排序操作 && 测试序列文件的合并操作 && 测试序列文件的压缩方式 && 测试将日志文件转换成序列文件

      作为 Hadoop 序列文件 中的 SequenceFile 的基本操作 部分的补充存在


    1. 测试读写 && 压缩

    package hadoop.sequencefile;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.Text;
    import org.junit.Test;
    
    import java.io.IOException;
    
    /**
     * 测试序列文件
     */
    public class TestSeqFile {
    
        /**
         * 测试序列文件写操作
         */
        @Test
        public void testWriteSeq() throws Exception {
    
            Configuration conf = new Configuration();
    
            // 设置文件系统为本地模式
            conf.set("fs.defaultFS", "file:///");
    
            FileSystem fs = FileSystem.get(conf);
    
    //        Path path = new Path("E:/test/none.seq");
    //        Path path = new Path("E:/test/record.seq");
            Path path = new Path("E:/test/block.seq");
            // 不压缩
    //        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.NONE);
            // 记录压缩
    //        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.RECORD);
            // 块压缩
            SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class, SequenceFile.CompressionType.BLOCK);
    
    
            for (int i = 1; i <= 1000; i++) {
                IntWritable key = new IntWritable(i);
                Text value = new Text("helloworld" + i);
    
                writer.append(key, value);
    
            }
    
            writer.close();
        }
    
        /**
         * 测试序列文件读操作
         */
        @Test
        public void testReadSeq() throws Exception {
            Configuration conf = new Configuration();
    
            // 设置文件系统为本地模式
            conf.set("fs.defaultFS", "file:///");
    
            FileSystem fs = FileSystem.get(conf);
    
            Path path = new Path("E:/test/block.seq");
    
            SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
    
            //初始化两个 Writable 对象
            IntWritable key = new IntWritable();
            Text value = new Text();
    
            while ((reader.next(key, value))) {
                long position = reader.getPosition();
                System.out.println("key: " + key.get() + " , " + " val: " + value.toString() + " , " + " pos: " + position);
            }
        }
    
    }

    2. 测试排序

    package hadoop.sequencefile;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.Text;
    import org.junit.Test;
    
    import java.util.Random;
    
    /**
     * 测试排序
     */
    public class TestSeqFileSort {
    
        /**
         * 创建无序 key-value 文件
         */
        @Test
        public void testWriteRandom() throws Exception {
    
            Configuration conf = new Configuration();
    
            conf.set("fs.defaultFS", "file:///");
    
            FileSystem fs = FileSystem.get(conf);
    
            Path p = new Path("E:/test/random.seq");
    
            SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, p, IntWritable.class, Text.class, SequenceFile.CompressionType.RECORD);
    
            // 初始化 random
            Random r = new Random();
    
            for (int i = 1; i < 100000; i++) {
                // 在0-99999之中随机选取一个值
                int j = r.nextInt(100000);
                IntWritable key = new IntWritable(j);
                Text value = new Text("helloworld" + j);
    
                writer.append(key, value);
    
            }
    
            writer.close();
    
        }
    
        /**
         * 测试seqFile排序
         */
        @Test
        public void testSort() throws Exception {
    
            Configuration conf = new Configuration();
    
            conf.set("fs.defaultFS", "file:///");
    
            FileSystem fs = FileSystem.get(conf);
    
            Path pin = new Path("E:/test/random.seq");
            Path pout = new Path("E:/test/sort.seq");
    
            SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, IntWritable.class, Text.class, conf);
    
            sorter.sort(pin, pout);
        }
    
        /**
         * 测试序列文件读操作
         */
        @Test
        public void testReadSeq() throws Exception {
            Configuration conf = new Configuration();
    
            // 设置文件系统为本地模式
            conf.set("fs.defaultFS", "file:///");
    
            FileSystem fs = FileSystem.get(conf);
    
            Path path = new Path("E:/test/sort.seq");
    
            SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
    
            //初始化两个 Writable 对象
            IntWritable key = new IntWritable();
            Text value = new Text();
    
            while ((reader.next(key, value))) {
                long position = reader.getPosition();
                System.out.println("key: " + key.get() + " , " + " val: " + value.toString() + " , " + " pos: " + position);
            }
        }
    
    }

    3. 测试合并

    package hadoop.sequencefile;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.Text;
    import org.junit.Test;
    
    /**
     * 测试文件合并,必须是同一种压缩类型
     */
    public class TestSeqFileMerge {
        /**
         * 测试序列文件写操作
         * 创建两个文件,范围为1-100,100-200
         */
        @Test
        public void testWriteSeq() throws Exception {
    
            Configuration conf = new Configuration();
    
            // 设置文件系统为本地模式
            conf.set("fs.defaultFS", "file:///");
    
            FileSystem fs = FileSystem.get(conf);
    
    //        Path path = new Path("E:/test/block1.seq");
            Path path = new Path("E:/test/block2.seq");
    
            // 块压缩
            SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class, SequenceFile.CompressionType.BLOCK);
    
    //        for (int i = 1; i <= 100; i++) {
            for (int i = 101; i <= 200; i++) {
                IntWritable key = new IntWritable(i);
                Text value = new Text("helloworld" + i);
    
                writer.append(key, value);
    
            }
    
            writer.close();
        }
    
        /**
         * 测试文件合并,合并的同时排序
         */
        @Test
        public void testMerge() throws Exception {
            Configuration conf = new Configuration();
    
            conf.set("fs.defaultFS", "file:///");
    
            FileSystem fs = FileSystem.get(conf);
    
            Path pin1 = new Path("E:/test/block1.seq");
            Path pin2 = new Path("E:/test/block2.seq");
            Path pout = new Path("E:/test/merge.seq");
    
            SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, IntWritable.class, Text.class, conf);
    
            Path[] p = {pin1, pin2};
    
            sorter.merge(p, pout);
        }
    
        /**
         * 测试序列文件读操作
         */
        @Test
        public void testReadSeq() throws Exception {
            Configuration conf = new Configuration();
    
            // 设置文件系统为本地模式
            conf.set("fs.defaultFS", "file:///");
    
            FileSystem fs = FileSystem.get(conf);
    
            Path path = new Path("E:/test/merge.seq");
    
            SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
    
            //初始化两个 Writable 对象
            IntWritable key = new IntWritable();
            Text value = new Text();
    
            while ((reader.next(key, value))) {
                long position = reader.getPosition();
                System.out.println("key: " + key.get() + " , " + " val: " + value.toString() + " , " + " pos: " + position);
            }
        }
    
    }

    4. 测试将日志文件转换成序列文件

    package hadoop.sequencefile;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.Text;
    
    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.io.IOException;
    
    /**
     * 测试将日志文件转换成序列文件
     * Windows 下查看压缩后的 SequenceFile :
     * hdfs dfs -text file:///E:/test/access.seq
     */
    public class Log2Seq {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
    
            // 设置文件系统为本地模式
            conf.set("fs.defaultFS", "file:///");
    
            FileSystem fs = FileSystem.get(conf);
    
            Path path = new Path("E:/test/access.seq");
    
            // 不压缩
    //        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.NONE);
            // 记录压缩
    //        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.RECORD);
            // 块压缩
            SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, NullWritable.class, Text.class, SequenceFile.CompressionType.BLOCK);
    
            BufferedReader br = new BufferedReader(new FileReader("E:/file/access.log1"));
    
            String line = null;
            while ((line = br.readLine()) != null) {
                NullWritable key = NullWritable.get();
                Text value = new Text(line);
                writer.append(key, value);
            }
    
            writer.close();
        }
    }

  • 相关阅读:
    TsinghuaX: 00740043X C++语言程序设计基础 第五章提纲
    TsinghuaX: 00740043X C++语言程序设计基础 第四章提纲
    TsinghuaX: 00740043X C++语言程序设计基础 第三章提纲
    TsinghuaX: 00740043X C++语言程序设计基础 第二章提纲
    TsinghuaX: 00740043X C++语言程序设计基础 第一章提纲
    弗洛伊德算法
    数据结构和算法
    iOS思路
    iOS开发之远程推送Push
    iOS开发之数据库FMDB
  • 原文地址:https://www.cnblogs.com/share23/p/9887854.html
Copyright © 2011-2022 走看看