原理层面暂时了解不深,只能通过简单的Java代码对HDFS系统进行基本的简单操作,在此做记录如下:
一、HDFS基本操作
1、获取FileSystem
首先需要获取HDFS这个分布式文件系统,JAVA的 org.apache.hadoop.fs 包下的FileSystem类便是为文件系统设计的。我们的目标便是实例化出HDFS的FileSystem对象。
获取FileSystem有多种方式,这里为入门先介绍最简单的一种。
public void getFileSystem2() throws IOException, URISyntaxException, InterruptedException { /* 第一个参数 HDFS 的主机名 + 端口 * 第二个参数:一个Configuration 对象,配置信息的对象
* Configuration configuration = new Configuration();
* configuration.set("fs.defaultFS","hdfs://bigdata1:8020"); * 第三个参数:指定用户,root用户为超级管理员 , 无特殊情况下尽量不加该参数 */ FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata1:8020"), new Configuration(), "root"); System.out.println(fileSystem); // 输出 DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_373282607_1, ugi=11655 (auth:SIMPLE)]] // 关闭 fileSystem.close(); }
2、创建文件 / 文件夹
public void mkdirs() throws URISyntaxException, IOException { FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata1:8020/a.txt"), new Configuration()); // 创建文件夹 boolean bl = fileSystem.mkdirs(new Path("/aaa2/bbb/ccc")); //创建文件 fileSystem.create(new Path("/aaa/aaa.txt")); // 两个创建方法都为递归创建 System.out.println(bl); fileSystem.close(); }
3、文件下载
public void downloadFile() throws URISyntaxException, IOException { FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata1:8020/a.txt"), new Configuration()); // 第一个参数,y要下载的HDFS文件路径 // 第二个参数,下载到本机的目录(不是虚拟机的主机) // Path(String str),路径类 fileSystem.copyToLocalFile(new Path("/a.txt"), new Path("D://a2.txt")); fileSystem.close(); }
4、文件上传
public void uploadFile() throws URISyntaxException, IOException { FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata1:8020/a.txt"), new Configuration()); /* * 第一个参数:本地文件路径 * 第二个参数:要上传的HDFS目录 * */ fileSystem.copyFromLocalFile(new Path("D://b.txt"),new Path("/")); fileSystem.close(); }
5、创建文件并写入数据
// 操作文件系统通过 FSDataOutputStream 流,通过fs.create方法得到流对象 FSDataOutputStream fsDataOutputStream = fs.create(new Path("\user\hadoop\test\data.dat")); fsDataOutputStream.write("Hello,BigData1111111111111".getBytes()); // 默认字节流 // 关闭流 fsDataOutputStream.close();
6、递归目录
@Test public void listFiles() throws IOException { // 调用方法listFiles 获取一个目录下的文件信息,为一个迭代器对象 // 第一个参数:指定目录 // 第二个参数,是否迭代获取 RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/user/hadoop"), true); while (listFiles.hasNext()) { // 每个迭代器元素为一个 LocatedFileStatus 对象,存储着文件的信息 LocatedFileStatus fileStatus = listFiles.next(); System.out.println("Name: " + fileStatus.getPath().getName()); System.out.println("Len: " + fileStatus.getLen()); System.out.println("BlockSize: " + fileStatus.getBlockSize()); System.out.println("Replication: " + fileStatus.getReplication()); BlockLocation[] blockLocations = fileStatus.getBlockLocations(); for (BlockLocation blk : blockLocations) { System.out.println("blk-length:" + blk.getLength() + " - blk-offset: " + blk.getOffset()); String[] hosts = blk.getHosts(); for (String host : hosts) { System.out.println("host: " + host); } } System.out.println("-------------------end...----------------------"); } }
二、工具类的实现
上述代码可以看出有些代码,例如文件系统的获取,关闭等,每个函数都重复使用,比较繁琐,我们可以封装起来,作为自己的工具类utils,方便使用。
package utils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import java.io.IOException; public class HDFSUtils { // 为方便管理,可将以下字段拿出作为单独的成员变量,便于后期修改 public static final String DEFAULT_FS_NAME = "fs.defaultFS"; public static final String DEFAULT_FS_VALUE = "hdfs://bigdata1:8020"; public static final String HADOOP_USER_NAME = "HADOOP_USER_NAME"; public static final String HADOOP_USER_VALUE = "root"; // 获取FileSystem对象 public static FileSystem getFS(){ Configuration configuration = new Configuration(); configuration.set(DEFAULT_FS_NAME, DEFAULT_FS_VALUE); System.setProperty(HADOOP_USER_NAME, HADOOP_USER_VALUE); FileSystem fs = null; try { fs = FileSystem.get(configuration); } catch (IOException e) { e.printStackTrace(); System.out.println("获取系统文件对象失败。"); } return fs; } // 关闭资源 public static void closeFS(FileSystem fs){ if (fs != null){ try { fs.close(); } catch (IOException e) { e.printStackTrace(); System.out.println("关闭系统文件对象失败。"); } } } }
但是有了工具类,每次工程测试,都要创建fileSystem对象和关闭资源,有些繁琐,我们可以在使用Test测试类的 before 和 after 注解,简化代码。
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.After; import org.junit.Before; import org.junit.Test; import utils.HDFSUtils; import java.io.IOException; public class Test01 { FileSystem fs = null; // 在所有功能执行前拿到fileSystem对象即可 @Before // 在程序测试前执行,只执行一次,可以用于工程的初始化 public void init(){ fs = HDFSUtils.getFS(); } // 在所有功能执行完毕后能够关闭FileSystem对象 @After //在程序测试后执行,只执行一次,可以用于关闭资源等收尾工作 public void close(){ HDFSUtils.closeFS(fs); } // 要测试的类直接加上 @Test 注解即可。 @Test public void mkdir() throws IOException { boolean b = fs.mkdirs(new Path("\user\hadoop\test")); if (b) { System.out.println("创建成功"); } else { System.out.println("创建失败"); } } }
三、使用流操作文件系统
我们这里使用第三方工具类IOUtils实现文件流的输入输出操作。
1、文件上传
@Test public void uploadFile() throws IOException { // 创建 hdfs 文件输出流,通过此 流 写入文件 FSDataOutputStream fsDataOutputStream = fs.create(new Path("/user/hadoop/dongao1.txt")); // 一个新路径,创建并写入该文件 // 获取本地文件输入流,将本地文件读入内存 FileInputStream inputStream = new FileInputStream("D:\dongao.txt"); // 通过IOUtils 的 copyBytes 执行读写 // 参数: 输入流,输出流,每次读取字节的个数 IOUtils.copyBytes(inputStream, fsDataOutputStream,1024); // 调用 IOUtils 的关闭流方法 IOUtils.closeStream(fsDataOutputStream); IOUtils.closeStream(inputStream); }
2、文件下载
@Test public void downloadFile() throws IOException { // 先打开一个HDFS中存在的文件输入流 FSDataInputStream inputStream = fs.open(new Path("/user/hadoop/dongao.txt")); // 获取一个文件的输出流,将内容写入本地 FileOutputStream outputStream = new FileOutputStream("D:\董奥.txt"); // 执行读写,读取输入流的内容,写入到输出流,单个文件大小4096个字节 IOUtils.copyBytes(inputStream,outputStream,4096); outputStream.flush(); // 手动刷新 IOUtils.closeStream(outputStream); IOUtils.closeStream(inputStream); }
3、查看文件内容
@Test public void cat() throws IOException { FSDataInputStream inputStream = fs.open(new Path("/user/hadoop/dongao.txt")); IOUtils.copyBytes(inputStream, System.out, 4096); IOUtils.closeStream(inputStream); }