1 public class HdfsClient { 2 3 FileSystem fs = null; 4 5 @Before 6 public void init() throws Exception { 7 8 // 构造一个配置参数对象,设置一个参数:我们要访问的hdfs的URI 9 // 从而FileSystem.get()方法就知道应该是去构造一个访问hdfs文件系统的客户端,以及hdfs的访问地址 10 // new Configuration();的时候,它就会去加载jar包中的hdfs-default.xml 11 // 然后再加载classpath下的hdfs-site.xml 12 Configuration conf = new Configuration(); 13 conf.set("fs.defaultFS", "hdfs://master:9000"); 14 /** 15 * 参数优先级: 1、客户端代码中设置的值 2、classpath下的用户自定义配置文件 3、然后是服务器的默认配置 16 */ 17 conf.set("dfs.replication", "2"); 18 conf.set("dfs.block.size","64m"); 19 20 // 获取一个hdfs的访问客户端,根据参数,这个实例应该是DistributedFileSystem的实例 21 // fs = FileSystem.get(conf); 22 23 // 如果这样去获取,那conf里面就可以不要配"fs.defaultFS"参数,而且,这个客户端的身份标识已经是hadoop用户 24 fs = FileSystem.get(new URI("hdfs://master:9000"), conf, "hadoop"); 25 26 // 获取文件系统相关信息 27 DatanodeInfo[] dataNodeStats = ((DistributedFileSystem) fs).getDataNodeStats(); 28 for(DatanodeInfo dinfo: dataNodeStats){ 29 System.out.println(dinfo.getHostName()); 30 } 31 32 } 33 34 /** 35 * 往hdfs上传文件 36 * 37 * @throws Exception 38 */ 39 @Test 40 public void testAddFileToHdfs() throws Exception { 41 42 // 要上传的文件所在的本地路径 43 Path src = new Path("g:/apache-flume-1.6.0-bin.tar.gz"); 44 // 要上传到hdfs的目标路径 45 Path dst = new Path("/"); 46 fs.copyFromLocalFile(src, dst); 47 48 fs.close(); 49 } 50 51 /** 52 * 从hdfs中复制文件到本地文件系统 53 * 54 * @throws IOException 55 * @throws IllegalArgumentException 56 */ 57 @Test 58 public void testDownloadFileToLocal() throws IllegalArgumentException, IOException { 59 60 // fs.copyToLocalFile(new Path("/apache-flume-1.6.0-bin.tar.gz"), new Path("d:/")); 61 fs.copyToLocalFile(false,new Path("/apache-flume-1.6.0-bin.tar.gz"), new Path("d:/"),true); 62 fs.close(); 63 64 } 65 66 /** 67 * 目录操作 68 * @throws IllegalArgumentException 69 * @throws IOException 70 */ 71 @Test 72 public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException { 73 74 // 创建目录 75 fs.mkdirs(new Path("/a1/b1/c1")); 76 77 // 删除文件夹 ,如果是非空文件夹,参数2必须给值true 78 fs.delete(new Path("/aaa"), true); 79 80 // 重命名文件或文件夹 81 fs.rename(new Path("/a1"), new Path("/a2")); 82 83 } 84 85 /** 86 * 查看目录信息,只显示文件 87 * 88 * @throws IOException 89 * @throws IllegalArgumentException 90 * @throws FileNotFoundException 91 */ 92 @Test 93 public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException { 94 95 // 思考:为什么返回迭代器,而不是List之类的容器, 如果文件特大, 那不就崩啦! 迭代器是每迭代一次都向服务器取一次 96 RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true); 97 98 while (listFiles.hasNext()) { 99 100 LocatedFileStatus fileStatus = listFiles.next(); 101 102 System.out.println(fileStatus.getPath().getName());//文件名 103 System.out.println(fileStatus.getBlockSize());//block块的大小 104 System.out.println(fileStatus.getPermission());//文件的权限 105 System.out.println(fileStatus.getLen());//字节数 106 BlockLocation[] blockLocations = fileStatus.getBlockLocations();//获取block块 107 for (BlockLocation bl : blockLocations) { 108 System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset()); 109 String[] hosts = bl.getHosts(); //主机名 110 for (String host : hosts) { 111 System.out.println(host); 112 } 113 114 } 115 116 System.out.println("--------------为angelababy打印的分割线--------------"); 117 118 } 119 120 } 121 122 /** 123 * 查看文件及文件夹信息 124 * 125 * @throws IOException 126 * @throws IllegalArgumentException 127 * @throws FileNotFoundException 128 */ 129 @Test 130 public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException { 131 132 FileStatus[] listStatus = fs.listStatus(new Path("/")); 133 134 String flag = "d-- "; 135 for (FileStatus fstatus : listStatus) { 136 137 if (fstatus.isFile()) flag = "f-- "; 138 139 System.out.println(flag + fstatus.getPath().getName()); 140 System.out.println(fstatus.getPermission()); 141 142 } 143 144 145 146 } 147 148 149 }
hadoop底层用流调用的api
1 ** 2 * 相对那些封装好的方法而言的更底层一些的操作方式 3 * 上层那些mapreduce spark等运算框架,去hdfs中获取数据的时候,就是调的这种底层的api 4 * @author 5 * 6 */ 7 public class StreamAccess { 8 9 FileSystem fs = null; 10 11 @Before 12 public void init() throws Exception { 13 14 Configuration conf = new Configuration(); 15 fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop"); 16 17 } 18 19 20 21 @Test 22 public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException{ 23 24 //先获取一个文件的输入流----针对hdfs上的 25 FSDataInputStream in = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz")); 26 27 //再构造一个文件的输出流----针对本地的 28 FileOutputStream out = new FileOutputStream(new File("c:/jdk.tar.gz")); 29 30 //再将输入流中数据传输到输出流 31 IOUtils.copyBytes(in, out, 4096); 32 33 34 } 35 36 @Test 37 public void testUploadByStream() throws Exception{ 38 39 //hdfs文件的输出流 40 FSDataOutputStream fsout = fs.create(new Path("/aaa.txt")); 41 42 //本地文件的输入流 43 FileInputStream fsin = new FileInputStream("c:/111.txt"); 44 45 IOUtils.copyBytes(fsin, fsout,4096); 46 47 48 } 49 50 51 52 53 /** 54 * hdfs支持随机定位进行文件读取,而且可以方便地读取指定长度 55 * 用于上层分布式运算框架并发处理数据 56 * @throws IllegalArgumentException 57 * @throws IOException 58 */ 59 @Test 60 public void testRandomAccess() throws IllegalArgumentException, IOException{ 61 62 FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));//先获取一个文件的输入流----针对hdfs上的 63 in.seek(22); //可以将流的起始偏移量进行自定义 64 FileOutputStream out = new FileOutputStream(new File("c:/iloveyou.line.2.txt"));//再构造一个文件的输出流----针对本地的 65 IOUtils.copyBytes(in,out,19L,true); //apache的工具类, 3.缓冲大小,4.是否关闭流 66 67 } 68 69 70 71 /** 72 * 读取指定的block 73 * @throws IOException 74 * @throws IllegalArgumentException 75 */ 76 @Test 77 public void testCat() throws IllegalArgumentException, IOException{ 78 79 FSDataInputStream in = fs.open(new Path("/weblog/input/access.log.10")); 80 //拿到文件信息 81 FileStatus[] listStatus = fs.listStatus(new Path("/weblog/input/access.log.10")); 82 //获取这个文件的所有block的信息 83 BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(listStatus[0], 0L, listStatus[0].getLen()); 84 85 86 //第一个block的长度 87 long length = fileBlockLocations[0].getLength(); 88 //第一个block的起始偏移量 89 long offset = fileBlockLocations[0].getOffset(); 90 91 System.out.println(length); 92 System.out.println(offset); 93 94 //获取第一个block写入输出流 95 // IOUtils.copyBytes(in, System.out, (int)length); 96 byte[] b = new byte[4096]; 97 98 FileOutputStream os = new FileOutputStream(new File("d:/block0")); 99 while(in.read(offset, b, 0, 4096)!=-1){ 100 os.write(b); 101 offset += 4096; 102 if(offset>length) return; 103 }; 104 105 os.flush(); 106 os.close(); 107 in.close(); 108 109 110 } 111 112 113 114 }
本代码来自传智播客,版权归传智播客所有