1、序列文件(二进制)的写入
/**
* 写入文件
* @throws IOException
*/
@Test
public void save() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("D:\sequence\1.seq");
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class);
for(int i = 0; i < 100; i++){
writer.append(new IntWritable(i),new Text("hello" + i));
}
writer.close();
}
读取序列文件
D:sequence>hdfs dfs -text file:///d:/sequence/1.seq
18/01/04 11:15:20 WARN zlib.ZlibFactory: Failed to load/initialize native-zlib library
18/01/04 11:15:20 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
0 hello0
1 hello1
2 hello2
3 hello3
4 hello4
5 hello5
6 hello6
7 hello7
2、序列文件的读取
/**
* 读取文件
* @throws IOException
*/
@Test
public void read() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("D:\sequence\1.seq");
SequenceFile.Reader reader = new SequenceFile.Reader(fs,path,conf);
IntWritable key = new IntWritable();
Text value = new Text();
while (reader.next(key,value)){
System.out.println(key.get() + "===>" + value.toString());
}
}
/**
* 读取文件2
* @throws IOException
*/
@Test
public void read2() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("D:\sequence\1.seq");
SequenceFile.Reader reader = new SequenceFile.Reader(fs,path,conf);
IntWritable key = new IntWritable();
Text value = new Text();
while (reader.next(key)){
reader.getCurrentValue(value);
System.out.println(key.get() + "===>" + value.toString());
}
}
3、读取文件指针
/**
* 读取文件指针
* @throws IOException
*/
@Test
public void read2() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("D:\sequence\1.seq");
SequenceFile.Reader reader = new SequenceFile.Reader(fs,path,conf);
long pos = reader.getPosition();
System.out.println("文件指针:" + pos);
IntWritable key = new IntWritable();
Text value = new Text();
while (reader.next(key)){
System.out.println("文件指针:" + reader.getPosition());
reader.getCurrentValue(value);
System.out.println(key.get() + "===>" + value.toString());
}
}
4、增加同步点
/**
* 写入文件
* @throws IOException
*/
@Test
public void save() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("D:\sequence\1.seq");
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class);
for(int i = 0; i < 10; i++){
writer.append(new IntWritable(i),new Text("hello" + i));
writer.sync();
}
for(int i = 0; i < 10; i++){
writer.append(new IntWritable(i),new Text("hello" + i));
if(i % 2 ==0) writer.sync();
}
writer.close();
}
文件指针:128
文件指针:155===>0===>hello0
文件指针:202===>1===>hello1
文件指针:249===>2===>hello2
文件指针:296===>3===>hello3
文件指针:343===>4===>hello4
文件指针:390===>5===>hello5
文件指针:437===>6===>hello6
文件指针:484===>7===>hello7
文件指针:531===>8===>hello8
文件指针:578===>9===>hello9
文件指针:625===>0===>hello0
文件指针:672===>1===>hello1
文件指针:699===>2===>hello2
文件指针:746===>3===>hello3
文件指针:773===>4===>hello4
文件指针:820===>5===>hello5
文件指针:847===>6===>hello6
文件指针:894===>7===>hello7
文件指针:921===>8===>hello8
文件指针:968===>9===>hello9
5、操作同步点
/**
* 读取文件指针,操纵同步点
* @throws IOException
*/
@Test
public void readsync() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("D:\sequence\1.seq");
SequenceFile.Reader reader = new SequenceFile.Reader(fs,path,conf);
long pos = reader.getPosition();
System.out.println("文件头部指针:" + pos);
IntWritable key = new IntWritable();
Text value = new Text();
//指定读取的开始点
reader.seek(155);
// reader.seek(237);
// reader.sync(209);
while (reader.next(key)){
reader.getCurrentValue(value);
System.out.println("文件指针:" + reader.getPosition()+"===>"+key.get() + "===>" + value.toString());
}
}
文件头部指针:128
文件指针:202===>1===>hello1
文件指针:249===>2===>hello2
文件指针:296===>3===>hello3
文件指针:343===>4===>hello4
文件指针:390===>5===>hello5
文件指针:437===>6===>hello6
文件指针:484===>7===>hello7
文件指针:531===>8===>hello8
文件指针:578===>9===>hello9
文件指针:625===>0===>hello0
文件指针:672===>1===>hello1
文件指针:699===>2===>hello2
文件指针:746===>3===>hello3
文件指针:773===>4===>hello4
文件指针:820===>5===>hello5
文件指针:847===>6===>hello6
文件指针:894===>7===>hello7
文件指针:921===>8===>hello8
文件指针:968===>9===>hello9
6、如果没有对应的指针,使用sync()
/**
* 读取文件指针,操纵同步点
* @throws IOException
*/
@Test
public void readsync() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("D:\sequence\1.seq");
SequenceFile.Reader reader = new SequenceFile.Reader(fs,path,conf);
long pos = reader.getPosition();
System.out.println("文件头部指针:" + pos);
IntWritable key = new IntWritable();
Text value = new Text();
//指定读取的开始点
// reader.seek(250);
// reader.seek(237);
reader.sync(344);
while (reader.next(key)){
reader.getCurrentValue(value);
System.out.println("文件指针:" + reader.getPosition()+"===>"+key.get() + "===>" + value.toString());
}
}
文件头部指针:128
文件指针:437===>6===>hello6
文件指针:484===>7===>hello7
文件指针:531===>8===>hello8
文件指针:578===>9===>hello9
文件指针:625===>0===>hello0
文件指针:672===>1===>hello1
文件指针:699===>2===>hello2
文件指针:746===>3===>hello3
文件指针:773===>4===>hello4
文件指针:820===>5===>hello5
文件指针:847===>6===>hello6
文件指针:894===>7===>hello7
文件指针:921===>8===>hello8
文件指针:968===>9===>hello9
7、一边写入一边压缩
/**
* 写入文件
* @throws IOException
*/
@Test
public void save() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("D:\sequence\1.seq");
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class, SequenceFile.CompressionType.RECORD,new GzipCodec());
for(int i = 0; i < 10; i++){
writer.append(new IntWritable(i),new Text("hello" + i));
writer.sync();
}
for(int i = 0; i < 10; i++){
writer.append(new IntWritable(i),new Text("hello" + i));
if(i % 2 ==0) writer.sync();
}
writer.close();
}