zoukankan      html  css  js  c++  java
  • Java读写hdfs上的avro文件

    1、通过Java往hdfs写avro文件

     1 import java.io.File;
     2 import java.io.IOException;
     3 import java.io.OutputStream;
     4 import java.nio.ByteBuffer;
     5 
     6 import org.apache.avro.Schema;
     7 import org.apache.avro.file.CodecFactory;
     8 import org.apache.avro.file.DataFileWriter;
     9 import org.apache.avro.generic.GenericData;
    10 import org.apache.avro.generic.GenericDatumWriter;
    11 import org.apache.avro.generic.GenericRecord;
    12 import org.apache.commons.io.FileUtils;
    13 import org.apache.hadoop.conf.Configuration;
    14 import org.apache.hadoop.fs.FileSystem;
    15 import org.apache.hadoop.fs.Path;
    16 import org.apache.hadoop.io.IOUtils;
    17 
    18 public class HdfsAvroTest {
    19 
    20     public static final String SCHEMA_JSON = "{"type": "record","name": "SmallFilesTest", "
    21             + ""fields": ["
    22             + "{"name":""
    23             + "username"
    24             + "","type":"string"},"
    25             + "{"name":""
    26             + "password"
    27             + "", "type":"string"}]}";
    28     public static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_JSON);
    29 
    30     public static void writeToAvro(File srcPath, OutputStream outputStream)
    31             throws IOException {
    32         DataFileWriter<Object> writer = new DataFileWriter<Object>(
    33                 new GenericDatumWriter<Object>()).setSyncInterval(100);
    34         writer.setCodec(CodecFactory.snappyCodec());
    35         writer.create(SCHEMA, outputStream);
    36         for (Object obj : FileUtils.listFiles(srcPath, null, false)) {
    37             File file = (File) obj;
    38             String filename = file.getAbsolutePath();
    39             byte content[] = FileUtils.readFileToByteArray(file);
    40             GenericRecord record = new GenericData.Record(SCHEMA);
    41             record.put("username", filename);
    42             record.put("password", ByteBuffer.wrap(content));
    43             writer.append(record);
    44         }
    45         IOUtils.cleanup(null, writer);
    46         IOUtils.cleanup(null, outputStream);
    47     }
    48 
    49     public static void main(String[] args) throws Exception {
    50         Configuration config = new Configuration();
    51         FileSystem hdfs = FileSystem.get(config);
    52         File sourceDir = new File(args[0]);
    53         Path destFile = new Path(args[1]);
    54         OutputStream os = hdfs.create(destFile);
    55         writeToAvro(sourceDir, os);
    56     }
    57 }

    2、Java读hdfs上的avro文件

     1 import java.io.IOException;
     2 import java.io.InputStream;
     3 
     4 import org.apache.avro.file.DataFileStream;
     5 import org.apache.avro.generic.GenericDatumReader;
     6 import org.apache.avro.generic.GenericRecord;
     7 import org.apache.hadoop.conf.Configuration;
     8 import org.apache.hadoop.fs.FileSystem;
     9 import org.apache.hadoop.fs.Path;
    10 import org.apache.hadoop.io.IOUtils;
    11 
    12 public class HdfsReadAvro {
    13 
    14 
    15     public static void readFromAvro(InputStream is) throws IOException {
    16         DataFileStream<Object> reader = new DataFileStream<Object>(is,
    17                 new GenericDatumReader<Object>());
    18         for (Object o : reader) {
    19             GenericRecord r = (GenericRecord) o;
    20             System.out.println(r.get("username")+ ":"+r.get("password"));
    21         }
    22         IOUtils.cleanup(null, is);
    23         IOUtils.cleanup(null, reader);
    24     }
    25 
    26     public static void main(String[] args) throws Exception {
    27         Configuration config = new Configuration();
    28         FileSystem hdfs = FileSystem.get(config);
    29         Path destFile = new Path(args[0]);
    30         InputStream is = hdfs.open(destFile);
    31         readFromAvro(is);
    32     }
    33 }
  • 相关阅读:
    IconRes提供免费高质量的Material风格android官方图标库
    android中的所谓观察者模式
    Android总结篇——Intent机制详解及示例总结
    SpringMVC注解@initbinder解决类型转换问题
    ubuntu16.04上安装tomcat7
    ImportError: No module named corsheaders
    linux 网卡
    工控机安装Ubuntu14.04
    python2安装django
    Ubuntu14.04 terminal添加右键
  • 原文地址:https://www.cnblogs.com/fillPv/p/5015619.html
Copyright © 2011-2022 走看看