类学习
Configuration
设置HDFS的相关参数。
conf = new Configuration ();
conf.set("fs.defaultFS","hdfs://bigdata:9000");
conf.set("dfs.client.use.datanode.hostname", "true");
conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem
该类的对象是一个文件系统对象,可以使用一些方法对文件进行操作。一般通过FileSystem的get方法来生成对象。
有两种方法获得FileSystem对象
通过get()
//其中conf是Configuration,服务器配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://xxx.xxx.xxx.xxx:9000");
conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem fs = FileSystem.get(conf);
//也可以通过get(new URI("URI地址"),conf)来精准获得FileSystem
通过newInstance()
FileSystem fs = FileSystem.newInstance(URI,conf);
FileSystem的功能
有以下几个功能:
- mkdirs(Path p),创建目录
- create(Path p),指定路径创建FSDataOutputStream
- exists(Path p),判断文件或目录是否存在
- listStatus(Path p),返回所有文件信息
- open(Path p),指定路径创建FSDataInputStream
Path
文件的存储路径。如果在本地文件系统用File
//一般用以下方法来生成
new Path(URI uri)
new Path(String pathString)
FileStatus
通过FileSystem的listStatus可以返回FileStatus,表示文件的详细信息。
FileStatus[] fileStatuses = fs.listStatus(path);
//可以通过添加PathFilter来过滤部分文件
FileStatus[] fileStatuses = fsSource.listStatus(inputPath, new PathFilter(""));
有以下API:
- getLen(),得到文件大小
- getPath(),得到Path路径
- getPermission(),获取权限信息
- isDir(),是否是目录
- isFile(),是否是文件
PathFilter
用来过滤部分文件,通过正则表达式。
class ImplementsPathFilter implements PathFilter{
private final String regex;
public ImplementsPathFilter(String regex) {
this.regex = regex;
}
@Override
public boolean accept(Path path) {
return !path.toString().matches(regex);
}
}
输入输出流
FSDataInputStream和FSDataOutputStream
FileSystem对象的open()方法返回的是FSDataInputStream对象,支持随机访问,可以从流的任意位置读取数据。FileSystem中的create()方法返回了一个FSDataOutputStream对象。
FSDataInputStram in = fs.open(new Path());
byte[] data = new byte[1024];
in.read(data);
System.out.println(new String(data));
FSDataOutputStram out = fs.create(new Path());
out.write(data);
read(data)从文件读取数据,并存储到data中。此时保存的是byte类型,需要通过new String转换为String类型,不可通过toString()。
write()写入数据。
API操作
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
public class HDFS_API {
public static FileSystem fs;
public static Configuration conf;
public void init() throws Exception {
//通过这种方式设置客户端身份
System.setProperty("HADOOP_USER_NAME", "hadoop");
conf = new Configuration ();
conf.set("fs.defaultFS","hdfs://bigdata:9000");
conf.set("dfs.client.use.datanode.hostname", "true");
conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
fs = FileSystem.get(conf);
//或者使用下面的方式设置客户端身份
//fs = FileSystem.get(new URI("hdfs://bigdata:9000"),conf,"hadoop");
}
public void close() throws Exception {
if(fs != null)
fs.close ();
}
//遍历path路径下文件及文件信息
public void listFiles(String path) throws Exception {
//listStatus可以添加FileFilter类过滤不要的文件
FileStatus[] files = fs.listStatus (new Path(path));
for(FileStatus file : files) {
//文件大小
System.out.println ( file.getLen () );
//文件路径
System.out.println ( file.getPath () );
//文件权限
System.out.println ( file.getPermission () );
//file.isFile()
//file.isDirectory()
}
}
//创建文件夹
public void mkDir(String path) throws Exception {
//第一个参数是路径Path,第二个参数是目录权限管理
fs.mkdirs ( new Path(path) );
}
//删除文件夹
public void deleteDir(String path) throws Exception {
fs.delete ( new Path(path), true );
}
//下载文件,通过copyTocalFile()和copyFromLocalFile()
//或者通过FSDataInputStream和FSDataOutputStream
public void getFileToLocal(String inputPath, String outputPath) throws Exception {
//fs.copyToLocalFile (new Path(inputPath), new Path(outputPath));
//FileSystem对象的open()方法返回一个FSDataInputStream,用以读数据,建立输入流
FSDataInputStream inputStream = fs.open (new Path(inputPath));
//本地的输出流
FileOutputStream outputStream = new FileOutputStream (new File (outputPath));
IOUtils.copyBytes (inputStream,outputStream,conf);
IOUtils.closeStreams ( inputStream, outputStream );
}
//上传文件
public void putFile(String inputPath, String outputPath) throws Exception {
//第一个参数是本地路径,第二个路径是上传路径,将本地文件上传到HDFS上
//fs.copyFromLocalFile (new Path(inputPath), new Path(outputPath));
FileInputStream inputStream = new FileInputStream (new File(inputPath));
FSDataOutputStream outputStream = fs.create (new Path(outputPath));
IOUtils.copyBytes (inputStream,outputStream,conf);
IOUtils.closeStreams ( inputStream,outputStream );
}
//在HDFS上用流读写数据
public void read_write(String inputPath, String outputPath) throws Exception {
FSDataInputStream inputStream = fs.open (new Path(inputPath));
FSDataOutputStream outputStream = fs.create (new Path(outputPath));
//只能操作字节流
byte[] buf = new byte[1024];
inputStream.read(buf);
//不可以用buf.toString,其没有重写toString()方法,只会返回类名和地址
System.out.println (new String(buf));
outputStream.write (buf);
IOUtils.closeStreams (inputStream,outputStream);
}
public static void main(String[] args) {
}
}