1、通过Java往hdfs写avro文件
import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class HdfsAvroTest { public static final String SCHEMA_JSON = "{"type": "record","name": "SmallFilesTest", " + ""fields": [" + "{"name":"" + "username" + "","type":"string"}," + "{"name":"" + "password" + "", "type":"string"}]}"; public static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_JSON); public static void writeToAvro(File srcPath, OutputStream outputStream) throws IOException { DataFileWriter<Object> writer = new DataFileWriter<Object>( new GenericDatumWriter<Object>()).setSyncInterval(100); writer.setCodec(CodecFactory.snappyCodec()); writer.create(SCHEMA, outputStream); for (Object obj : FileUtils.listFiles(srcPath, null, false)) { File file = (File) obj; String filename = file.getAbsolutePath(); byte content[] = FileUtils.readFileToByteArray(file); GenericRecord record = new GenericData.Record(SCHEMA); record.put("username", filename); record.put("password", ByteBuffer.wrap(content)); writer.append(record); } IOUtils.cleanup(null, writer); IOUtils.cleanup(null, outputStream); } public static void main(String[] args) throws Exception { Configuration config = new Configuration(); FileSystem hdfs = FileSystem.get(config); File sourceDir = new File(args[0]); Path destFile = new Path(args[1]); OutputStream os = hdfs.create(destFile); writeToAvro(sourceDir, os); } }
2、Java读hdfs上的avro文件
import java.io.IOException; import java.io.InputStream; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class HdfsReadAvro { public static void readFromAvro(InputStream is) throws IOException { DataFileStream<Object> reader = new DataFileStream<Object>(is, new GenericDatumReader<Object>()); for (Object o : reader) { GenericRecord r = (GenericRecord) o; System.out.println(r.get("username")+ ":"+r.get("password")); } IOUtils.cleanup(null, is); IOUtils.cleanup(null, reader); } public static void main(String[] args) throws Exception { Configuration config = new Configuration(); FileSystem hdfs = FileSystem.get(config); Path destFile = new Path(args[0]); InputStream is = hdfs.open(destFile); readFromAvro(is); } }