zoukankan      html  css  js  c++  java
  • Hadoop JAVA HDFS客户端操作

    JAVA HDFS客户端操作

    通过API操作HDFS

    org.apache.logging.log4jlog4j-core2.8.2org.apache.hadoophadoop-common${hadoop.version}org.apache.hadoophadoop-hdfs${hadoop.version}org.apache.hadoophadoop-client${hadoop.version} " v:shapes="文本框_x0020_2">配置maven的pom文件

    创建第一个java工程

    public class HdfsClientDemo1 {

           public static void main(String[] args) throws Exception {

                  // 1 获取文件系统

                  Configuration configuration = new Configuration();

                  // 配置在集群上运行

                  configuration.set("fs.defaultFS", "hdfs://hadoop-001:9000");

                  FileSystem fileSystem = FileSystem.get(configuration);

                 

                  // 直接配置访问集群的路径和访问集群的用户名称

    //            FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop-001:9000"),configuration, "hadoop");

                 

                  // 2 把本地文件上传到文件系统中

                  fileSystem.copyFromLocalFile(new Path("f:/hello.txt"), new Path("/hello1.copy.txt"));

                 

                  // 3 关闭资源

                  fileSystem.close();

                  System.out.println("over");

           }

    }

    出现下以异常现象

     

    解决访问权限有两种解决方案:

    1、 配置vm的参数


    -DHADOOP_USER_NAME=hadoop


     

    2、 直接在SystemFile.get方法指明用户名

     

    3.2.1 HDFS获取文件系统

    1)详细代码

           @Test

           public void initHDFS() throws Exception{

                  // 1 创建配置信息对象

                  // new Configuration();的时候,它就会去加载jar包中的hdfs-default.xml

                  // 然后再加载classpath下的hdfs-site.xml

                  Configuration configuration = new Configuration();

                 

                  // 2 设置参数

                  // 参数优先级: 1、客户端代码中设置的值  2、classpath下的用户自定义配置文件 3、然后是服务器的默认配置

    //            configuration.set("fs.defaultFS", "hdfs://hadoop102:9000");

                  configuration.set("dfs.replication", "3");

                 

                  // 3 获取文件系统

                  FileSystem fs = FileSystem.get(configuration);

                 

                  // 4 打印文件系统

                  System.out.println(fs.toString());

           }

    2)将core-site.xml拷贝到项目的根目录下

    <?xml version="1.0" encoding="UTF-8"?>

    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

    <configuration>

    <!-- 指定HDFS中NameNode的地址 -->

           <property>

                  <name>fs.defaultFS</name>

            <value>hdfs://hadoop102:9000</value>

           </property>

           <!-- 指定hadoop运行时产生文件的存储目录 -->

           <property>

                  <name>hadoop.tmp.dir</name>

                  <value>/opt/module/hadoop-2.7.2/data/tmp</value>

           </property>

    </configuration>

    3.2.2 HDFS文件上传

           @Test

           public void putFileToHDFS() throws Exception{

                  // 1 创建配置信息对象

                  Configuration configuration = new Configuration();

                 

                  FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"),configuration, "hadoop");

                 

                  // 2 创建要上传文件所在的本地路径

                  Path src = new Path("e:/hello.txt");

                 

                  // 3 创建要上传到hdfs的目标路径

                  Path dst = new Path("hdfs://hadoop102:9000/user/hadoop/hello.txt");

                 

                  // 4 拷贝文件

                  fs.copyFromLocalFile(src, dst);

                  fs.close();

    }

    如何更改副本个数?

    1、          在类路径新建hdfs-site.xml文件

     

    2、        直接configuration里面设置键值对象

     

    3、          3.2.3 HDFS文件下载

    @Test

    public void getFileFromHDFS() throws Exception{

                 

           // 1 创建配置信息对象

           Configuration configuration = new Configuration();

                 

           FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"),configuration, "hadoop");    

          

    //     fs.copyToLocalFile(new Path("hdfs://hadoop102:9000/user/hadoop/hello.txt"), new Path("d:/hello.txt"));

           // boolean delSrc 指是否将原文件删除

           // Path src 指要下载的文件路径

           // Path dst 指将文件下载到的路径

           // boolean useRawLocalFileSystem 是否开启文件效验

        // 2 下载文件

           fs.copyToLocalFile(false, new Path("hdfs://hadoop102:9000/user/hadoop/hello.txt"), new Path("e:/hellocopy.txt"), true);

           fs.close();

           }

    3.2.4 HDFS目录创建

           @Test

           public void mkdirAtHDFS() throws Exception{

                  // 1 创建配置信息对象

                  Configuration configuration = new Configuration();

                 

                  FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"),configuration, "hadoop");    

                 

                  //2 创建目录

                  fs.mkdirs(new Path("hdfs://hadoop102:9000/user/hadoop/output"));

           }

    3.2.5 HDFS文件夹删除

           @Test

           public void deleteAtHDFS() throws Exception{

                  // 1 创建配置信息对象

                  Configuration configuration = new Configuration();

                 

                  FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"),configuration, "hadoop");    

                 

                  //2 删除文件夹 ,如果是非空文件夹,参数2必须给值true

                  fs.delete(new Path("hdfs://hadoop102:9000/user/hadoop/output"), true);

           }

    3.2.6 HDFS文件名更改

           @Test

           public void renameAtHDFS() throws Exception{

                  // 1 创建配置信息对象

                  Configuration configuration = new Configuration();

                 

                  FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"),configuration, "hadoop");

                 

                  //2 重命名文件或文件夹

                  fs.rename(new Path("hdfs://hadoop102:9000/user/hadoop/hello.txt"), new Path("hdfs://hadoop102:9000/user/hadoop/hellonihao.txt"));

           }

    3.2.7 HDFS文件详情查看

    @Test

    public void readListFiles() throws Exception {

           // 1 创建配置信息对象

           Configuration configuration = new Configuration();

                 

           FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"),configuration, "hadoop");

                 

           // 思考:为什么返回迭代器,而不是List之类的容器

           RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);

           while (listFiles.hasNext()) {

                  LocatedFileStatus fileStatus = listFiles.next();

                        

                  System.out.println(fileStatus.getPath().getName());

                  System.out.println(fileStatus.getBlockSize());

                  System.out.println(fileStatus.getPermission());

                  System.out.println(fileStatus.getLen());

                        

                  BlockLocation[] blockLocations = fileStatus.getBlockLocations();

                        

                  for (BlockLocation bl : blockLocations) {

                               

                         System.out.println("block-offset:" + bl.getOffset());

                               

                         String[] hosts = bl.getHosts();

                               

                         for (String host : hosts) {

                                System.out.println(host);

                         }

                  }

                        

                  System.out.println("--------------李冰冰的分割线--------------");

           }

           }

    3.2.8 HDFS文件夹查看

    @Test

    public void findAtHDFS() throws Exception, IllegalArgumentException, IOException{

                 

           // 1 创建配置信息对象

           Configuration configuration = new Configuration();

                 

           FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"),configuration, "hadoop");

                 

           // 2 获取查询路径下的文件状态信息

           FileStatus[] listStatus = fs.listStatus(new Path("/"));

           // 3 遍历所有文件状态

           for (FileStatus status : listStatus) {

                  if (status.isFile()) {

                         System.out.println("f--" + status.getPath().getName());

                  } else {

                         System.out.println("d--" + status.getPath().getName());

                  }

           }

    }

    3.3 通过IO流操作HDFS

    3.3.1 HDFS文件上传

           @Test

           public void putFileToHDFS() throws Exception{

                  // 1 创建配置信息对象

                  Configuration configuration = new Configuration();

                 

                  FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"),configuration, "hadoop");

                 

                  // 2 创建输入流

                  FileInputStream inStream = new FileInputStream(new File("e:/hello.txt"));

                 

                  // 3 获取输出路径

                  String putFileName = "hdfs://hadoop102:9000/user/hadoop/hello1.txt";

                  Path writePath = new Path(putFileName);

                  // 4 创建输出流

                  FSDataOutputStream outStream = fs.create(writePath);

                  // 5 流对接

                  try{

                         IOUtils.copyBytes(inStream, outStream, 4096, false);

                  }catch(Exception e){

                         e.printStackTrace();

                  }finally{

                         IOUtils.closeStream(inStream);

                         IOUtils.closeStream(outStream);

                  }

           }

    3.3.2 HDFS文件下载

           @Test

           public void getFileToHDFS() throws Exception{

                  // 1 创建配置信息对象

                  Configuration configuration = new Configuration();

                 

                  FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"),configuration, "hadoop");

                 

                  // 2 获取读取文件路径

                  String filename = "hdfs://hadoop102:9000/user/hadoop/hello1.txt";

                 

                  // 3 创建读取path

                  Path readPath = new Path(filename);

                 

                  // 4 创建输入流

                  FSDataInputStream inStream = fs.open(readPath);

                 

                  // 5 流对接输出到控制台

                  try{

                         IOUtils.copyBytes(inStream, System.out, 4096, false);

                  }catch(Exception e){

                         e.printStackTrace();

                  }finally{

                         IOUtils.closeStream(inStream);

                  }

           }

    3.3.3 定位文件读取

    1)下载第一块

    @Test

    // 定位下载第一块内容

    public void readFileSeek1() throws Exception {

           // 1 创建配置信息对象

           Configuration configuration = new Configuration();

           FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "hadoop");

           // 2 获取输入流路径

           Path path = new Path("hdfs://hadoop102:9000/user/hadoop/tmp/hadoop-2.7.2.tar.gz");

           // 3 打开输入流

           FSDataInputStream fis = fs.open(path);

           // 4 创建输出流

           FileOutputStream fos = new FileOutputStream("e:/hadoop-2.7.2.tar.gz.part1");

           // 5 流对接

           byte[] buf = new byte[1024];

           for (int i = 0; i < 128 * 1024; i++) {

                  fis.read(buf);

                  fos.write(buf);

           }

           // 6 关闭流

           IOUtils.closeStream(fis);

           IOUtils.closeStream(fos);

           }

    2)下载第二块

           @Test

           // 定位下载第二块内容

           public void readFileSeek2() throws Exception{

                 

                  // 1 创建配置信息对象

                  Configuration configuration = new Configuration();

                  FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "hadoop");

                 

                  // 2 获取输入流路径

                  Path path = new Path("hdfs://hadoop102:9000/user/hadoop/tmp/hadoop-2.7.2.tar.gz");

                 

                  // 3 打开输入流

                  FSDataInputStream fis = fs.open(path);

                 

                  // 4 创建输出流

                  FileOutputStream fos = new FileOutputStream("e:/hadoop-2.7.2.tar.gz.part2");

                 

                  // 5 定位偏移量(第二块的首位)

                  fis.seek(1024 * 1024 * 128);

                 

                  // 6 流对接

                  IOUtils.copyBytes(fis, fos, 1024);

                 

                  // 7 关闭流

                  IOUtils.closeStream(fis);

                  IOUtils.closeStream(fos);

           }

    3)读取块信息

           Configuration configuration = new Configuration();

            FileSystem fs = FileSystem.get(new URI("hdfs://hadoop-001:9000"),configuration, "hadoop");

            // 2 获取读取文件路径

            String filename = "hdfs://hadoop-001:9000/0306_668/hadoop-2.7.2.tar.gz";

            // 3 创建读取path

            Path readPath = new Path(filename);

            // 4 创建输入流

            HdfsDataInputStream hdis=

                    (HdfsDataInputStream)

                            fs.open(readPath);

            List<LocatedBlock> allBlocks=

                    hdis.getAllBlocks();

            for(LocatedBlock block:allBlocks){

                ExtendedBlock eBlock=

                        block.getBlock();

                System.out.println("------------------------");

                System.out.println(

                        eBlock.getBlockId());

                System.out.println(

                        eBlock.getBlockName());

                System.out.println(

                        block.getBlockSize());

                System.out.println(

                        block.getStartOffset());

                // 获取当前的数据块所在的DataNode的信息

                DatanodeInfo[] locations=

                        block.getLocations();

                for(DatanodeInfo info:locations){

                    System.out.println(

                            info.getIpAddr());

                    System.out.println(

                            info.getHostName());

                }

            }

            hdis.close();

            fs.close();

    合并文件指令:type hadoop-2.7.2.tar.gz.part2>>hadoop-2.7.2.tar.gz.part1

     

    代码如下:

    package com.gec.demo;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.*;
    import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
    import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
    import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
    import org.apache.hadoop.hdfs.protocol.LocatedBlock;
    import org.apache.hadoop.io.IOUtils;
    import org.junit.Test;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import java.util.List;
    
    public class HdfsClientAppTest
    {
    
        @Test
        public void getHdfsClient() throws IOException {
            //如何通过java操作hdfs
            //1、新建Configuration对象
    
            Configuration configuration=new Configuration();
    
            configuration.set("fs.defaultFS","hdfs://hadoop-001:9000");
    
            //2获取FileSystem对象
            FileSystem fileSystem=FileSystem.get(configuration);
            fileSystem.mkdirs(new Path("/100_3"));
    
            //关闭,释放资源
            fileSystem.close();
        }
    
        @Test
        public void getHdfsClient2() throws URISyntaxException, IOException, InterruptedException {
            Configuration configuration=new Configuration();
    
            FileSystem fileSystem=FileSystem.get(
                    new URI("hdfs://hadoop-001:9000"),
                    configuration,
                    "hadoop");
    
            fileSystem.mkdirs(new Path("/100_2"));
    
            fileSystem.close();
    
        }
    
        /*
        * 实现一个文件上传
        * */
        @Test
        public void putFileToHDFS() throws URISyntaxException, IOException, InterruptedException {
    
    
            Configuration configuration=new Configuration();
    
            FileSystem fileSystem=FileSystem.get(
                    new URI("hdfs://hadoop-001:9000"),
                    configuration,
                    "hadoop");
    
            //
            // 参数一:Path src:源文件
            // 参数二:Path dst:目标文件
            Path srcPath=new Path("D:\src\hello.txt");
            Path destPath=new Path("/100_1/hello.txt");
            fileSystem.copyFromLocalFile(srcPath,destPath);
    
            fileSystem.close();
    
        }
    
    
        @Test
        public void putFileToHDFS2() throws URISyntaxException, IOException, InterruptedException {
    
    
            Configuration configuration=new Configuration();
    
            FileSystem fileSystem=FileSystem.get(
                    new URI("hdfs://hadoop-001:9000"),
                    configuration,
                    "hadoop");
    
            //
            // 参数一:Path src:源文件
            // 参数二:Path dst:目标文件
            Path srcPath=new Path("D:\src\hello.txt");
            Path destPath=new Path("/100_2/hello.txt");
            fileSystem.copyFromLocalFile(srcPath,destPath);
    
            fileSystem.close();
    
        }
    
    
        @Test
        public void putFileToHDFS3() throws URISyntaxException, IOException, InterruptedException {
    
    
            Configuration configuration=new Configuration();
            configuration.set("dfs.replication","2");
    
            FileSystem fileSystem=FileSystem.get(
                    new URI("hdfs://hadoop-001:9000"),
                    configuration,
                    "hadoop");
    
            //
            // 参数一:Path src:源文件
            // 参数二:Path dst:目标文件
            Path srcPath=new Path("D:\src\hello.txt");
            Path destPath=new Path("/100_3/hello.txt");
            fileSystem.copyFromLocalFile(srcPath,destPath);
    
            fileSystem.close();
    
        }
    
        //HDFS文件夹删除
        @Test
        public void deleteAtHDFS() throws URISyntaxException, IOException, InterruptedException {
    
            Configuration configuration=new Configuration();
            configuration.set("dfs.replication","2");
    
            FileSystem fileSystem=FileSystem.get(
                    new URI("hdfs://hadoop-001:9000"),
                    configuration,
                    "hadoop");
    
            fileSystem.delete(new Path("/100_3"),true);
    
            fileSystem.close();
        }
    
        //修改文件名
        @Test
        public void renameAtHDFS() throws URISyntaxException, IOException, InterruptedException {
    
            Configuration configuration=new Configuration();
    
            FileSystem fileSystem=FileSystem.get(
                    new URI("hdfs://hadoop-001:9000"),
                    configuration,
                    "hadoop");
    
            fileSystem.rename(new Path("/100_2/hello.txt"),new Path("/100_2/hello2.txt"));
    
            fileSystem.close();
    
        }
    
    
        //查看文件列表
        @Test
        public void readListFiles() throws URISyntaxException, IOException, InterruptedException {
            Configuration configuration=new Configuration();
    
            FileSystem fileSystem=FileSystem.get(
                    new URI("hdfs://hadoop-001:9000"),
                    configuration,
                    "hadoop");
    
            RemoteIterator<LocatedFileStatus> iterator=fileSystem.listFiles(new Path("/"),true);
    
            while (iterator.hasNext())
            {
                LocatedFileStatus filestatus = iterator.next();
    
                System.out.println("权限="+filestatus.getPermission());
                System.out.println("文件名="+filestatus.getPath().getName());
                System.out.println("文件大小="+filestatus.getLen());
                System.out.println("文件副本数="+filestatus.getReplication());
                
                //获取块的位置信息
                //
                BlockLocation[] blockLocations = filestatus.getBlockLocations();
    
                for (BlockLocation blockLocation : blockLocations) {
                    System.out.println("块的偏移量="+blockLocation.getOffset());
                    System.out.println("块大小="+blockLocation.getLength());
                    String hosts[]=blockLocation.getHosts();
                    for (String host : hosts) {
                        System.out.println("副本存储的主机位置="+host);
                    }
    
                    System.out.println("区别块信息---------------");
    
                }
    
    
                System.out.println("区别文件信息-----------------------------");
    
            }
    
            fileSystem.close();
    
        }
    
    
        /*
         * 通过io流实现文件上传到hdfs
         * */
        @Test
        public void putFileToHDFSByIOStream() throws Exception
        {
            Configuration configuration=new Configuration();
    
            FileSystem fileSystem=FileSystem.get(
                    new URI("hdfs://hadoop-001:9000"),
                    configuration,
                    "hadoop");
    
            FileInputStream fileInputStream=new FileInputStream("D:\src\Ahost.java");
    
    
            FSDataOutputStream fsDataOutputStream=fileSystem.create(new Path("/100_1/Ahost.java"));
            IOUtils.copyBytes(fileInputStream,fsDataOutputStream,1024,true);
    
       /*     fileInputStream.close();
            fsDataOutputStream.close();*/
            fileSystem.close();
    
        }
    
    
    
        /*
        * 通过io流实现从hdfs下载本地文件
        * */
    
        @Test
        public void downloadFileByIOStream() throws Exception
        {
            Configuration configuration=new Configuration();
    
            FileSystem fileSystem=FileSystem.get(
                    new URI("hdfs://hadoop-001:9000"),
                    configuration,
                    "hadoop");
    
            FSDataInputStream fsDataInputStream=fileSystem.open(new Path("hdfs://hadoop-001:9000/100_1/Ahost.java"));
    
            IOUtils.copyBytes(fsDataInputStream,System.out,1024,true);
    
    
       /*     fileInputStream.close();
            fsDataOutputStream.close();*/
            fileSystem.close();
    
        }
    
    
    
        @Test
        public void getBlockInfo() throws Exception
        {
            Configuration configuration=new Configuration();
    
            FileSystem fileSystem=FileSystem.get(
                    new URI("hdfs://hadoop-001:9000"),
                    configuration,
                    "hadoop");
    
            HdfsDataInputStream hdis= (HdfsDataInputStream) fileSystem.open(new Path("hdfs://hadoop-001:9000/100_1/hadoop-2.7.2.tar.gz"));
    
            List<LocatedBlock> allBlocks = hdis.getAllBlocks();
            for (LocatedBlock allBlock : allBlocks) {
    
                ExtendedBlock block = allBlock.getBlock();
                System.out.println("块id="+block.getBlockId());
                System.out.println("块文件名="+block.getBlockName());
                System.out.println("时间="+block.getGenerationStamp());
    
                DatanodeInfo[] locations = allBlock.getLocations();
                for (DatanodeInfo location : locations) {
                    System.out.println("存储datanode的主机名="+location.getHostName());
                }
    
                System.out.println("---------------------");
    
            }
    
            fileSystem.close();
    
        }
    
    
        //下载第一块内容
        @Test
        public void downFirstBlock() throws Exception
        {
            Configuration configuration=new Configuration();
    
            FileSystem fileSystem=FileSystem.get(
                    new URI("hdfs://hadoop-001:9000"),
                    configuration,
                    "hadoop");
    
            FSDataInputStream fsinput=fileSystem.open(new Path("hdfs://hadoop-001:9000/100_1/hadoop-2.7.2.tar.gz"));
            FileOutputStream fileOutputStream=new FileOutputStream("D:\src\hadoop-2.7.2.tar.gz.part1");
    
            //128MB
            byte[] buf = new byte[1024];
            for (int i = 0; i < 128 * 1024; i++) {
                fsinput.read(buf);
                fileOutputStream.write(buf);
            }
    
    
            fileOutputStream.close();
            fsinput.close();
    
            fileSystem.close();
    
        }
    
    
        //下载第二块内容
        @Test
        public void downFirstBlock2() throws Exception
        {
            Configuration configuration=new Configuration();
    
            FileSystem fileSystem=FileSystem.get(
                    new URI("hdfs://hadoop-001:9000"),
                    configuration,
                    "hadoop");
    
            FSDataInputStream fsinput=fileSystem.open(new Path("hdfs://hadoop-001:9000/100_1/hadoop-2.7.2.tar.gz"));
            FileOutputStream fileOutputStream=new FileOutputStream("D:\src\hadoop-2.7.2.tar.gz.part2");
    
            //定义偏移量为128MB
            fsinput.seek(1024 * 1024 * 128);
    
            IOUtils.copyBytes(fsinput,fileOutputStream,1024,true);
    
            fileSystem.close();
    
        }
    
    
    
    }

     

  • 相关阅读:
    C# Redis实战(五)
    C# Redis实战(四)
    C# Redis实战(三)
    C# Redis实战(二)
    C# Redis实战(一)
    memcached的基本命令(安装、卸载、启动、配置相关)
    git和tortoisegit安装教程
    编程规范是非常重要的,为什么说可读性比什么都重要?你有没有确定一个编程规范呢?
    关于VR游戏的前景
    在项目开发过程中如何处理人际关系
  • 原文地址:https://www.cnblogs.com/Transkai/p/10468570.html
Copyright © 2011-2022 走看看