zoukankan      html  css  js  c++  java
  • hadoop hdfs (java api)

    简单介绍使用java控制hdfs文件系统

    一、注意namenode端访问权限,修改hdfs-site.xml文件或修改文件目录权限

    本次采用修改hdfs-site.xml用于测试,在configuration节点中添加如下内容

        <property>
            <name>dfs.permissions.enabled</name>
            <value>false</value>
        </property>

    二、本次使用eclipse环境新建项目完成测试

    使用手动添加jar包完成环境准备,jar包位于hadoop解压目录 

    如下:

    hadoop-2.7.3sharehadoopcommonhadoop-common-2.7.3.jar
    hadoop-2.7.3sharehadoopcommonlib*.jar
    hadoop-2.7.3sharehadoophdfshadoop-hdfs-2.7.3.jar

    添加完成jar包就可以编写代码,链接hdfs文件系统

    链接hdfs需完成如下步骤

    1.创建 org.apache.hadoop.conf.Configuration 用于指定客户端的配置(服务器的地址,上传下载文件的一些配置),本次采用如下方式配置

    package com.huaqin.hdfs.conf;
    
    import org.apache.hadoop.conf.Configuration;
    
    public class DeFaultDfsClientConfigration extends Configuration{
        
        public DeFaultDfsClientConfigration() {
            this.set("fs.defaultFS","hdfs://*.*.*.*:9000");
            this.set("dfs.replication", "2");
        }
    }

    2.编写Utils封装常见操作文件方法

    需使用org.apache.hadoop.fs.FileSystem

    通过上面的配置文件创建

    FileSystem fileSystem = FileSystem.get(new DeFaultDfsClientConfigration());

    创建完成之后便可以操作hdfs了,代码封装如下

    package com.huaqin.hdfs.utils;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.util.Map;
    
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    
    import com.huaqin.hdfs.conf.DeFaultDfsClientConfigration;
    
    public class HDFSFileUtils {
    
        public double progressBar;
    
        public HDFSFileUtils() throws IOException {
            // 使用默认类加载
            fileSystem = FileSystem.get(new DeFaultDfsClientConfigration());
        }
    
        public HDFSFileUtils(DeFaultDfsClientConfigration clientConfration) throws IOException {
            // 使用指定类加载
            fileSystem = FileSystem.get(clientConfration);
        }
    
        // 默认客户端配置类
        private FileSystem fileSystem;
    
        public void reloadClientConfigration(DeFaultDfsClientConfigration clientConfration) {
            fileSystem.setConf(clientConfration);
        }
    
        public FileStatus[] list(String fileName) throws FileNotFoundException, IllegalArgumentException, IOException {
            // TODO Auto-generated method stub
            FileStatus[] statusList = this.fileSystem.listStatus(new Path(fileName));
            return statusList;
        }
    
        public void text(String fileName) throws IllegalArgumentException, IOException {
            // TODO Auto-generated method stub
            FSDataInputStream inputStream = this.fileSystem.open(new Path(fileName));
            IOUtils.copyBytes(inputStream, System.out, fileSystem.getConf());
        }
    
        // 上传文件
        public void upload(String src, String dest) throws IOException {
            // TODO Auto-generated method stub
            FileInputStream in = new FileInputStream(src);
            FSDataOutputStream os = this.fileSystem.create(new Path(dest), true);
            IOUtils.copyBytes(in, os, 4096, true);
        }
    
        // 删除文件
        public boolean deleteFile(String dest) throws IllegalArgumentException, IOException {
            // TODO Auto-generated method stub
            boolean success = this.fileSystem.delete(new Path(dest), true);
            return success;
        }
    
        // 创建文件夹
        public boolean makeDir(String dest) throws IllegalArgumentException, IOException {
            return this.fileSystem.mkdirs(new Path(dest));
        }
    
        // 下载显示进度
        public void download2(String dest, Map<String, Integer> descript) throws IllegalArgumentException, IOException {
            FSDataInputStream in = fileSystem.open(new Path(dest));
            descript.put("byteSize", in.available());
            descript.put("current", 0);
            byte[] bs = new byte[1024];
            while (-1 != (in.read(bs))) {
                descript.put("current", descript.get("current") + 1024);
            }
            in.close();
        }
    
        // 上传显示进度
        public void upload2(String src, String dest, Map<String, Long> descript)
                throws IllegalArgumentException, IOException {
            File file = new File(src);
            FileInputStream in = new FileInputStream(file);
            FSDataOutputStream out = this.fileSystem.create(new Path(dest), true);
            descript.put("byteSize", file.length());
            descript.put("current", 0l);
            // 0.5mb
            byte[] bs = new byte[1024 * 1024 / 2];
            while (-1 != (in.read(bs))) {
                out.write(bs);
                descript.put("current", descript.get("current") + 1024);
            }
            out.close();
            in.close();
        }
    
    }

    三、以下是JUnitTest测试环境

    import java.io.IOException;
    import java.text.DecimalFormat;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.junit.Before;
    import org.junit.Test;
    
    import com.huaqin.hdfs.utils.HDFSFileUtils;
    
    public class HDFSFileUtilsJUT {
    
        @Before
        public void before() throws IOException {
            fileUtils = new HDFSFileUtils();
        }
    
        HDFSFileUtils fileUtils;
    
        @Test
        public void testCreateNEWFile() throws IOException {
    //        fileUtils.upload("D:\temp\helloworld.txt", "/tmp/helloworld.txt");
            fileUtils.upload("E:\devtool\hadoop-2.7.3.tar.gz", "/hadoop-2.7.3.tar.gz");
        }
        
        
        @Test
        public void testText() throws IllegalArgumentException, IOException {
            fileUtils.text("/hello.txt");
        }
        
        @Test
        public void testDeleteFile() throws IllegalArgumentException, IOException {
            boolean success = fileUtils.deleteFile("/CentOS-7-x86_64-DVD-1511.iso");
            System.out.println(success);
        }
        
        @Test
        public void testZMikdirs() throws IllegalArgumentException, IOException {
            boolean success = fileUtils.makeDir("/tmp");
            System.out.println(success);
        }
        
        @Test
        public void testdownload2() throws IllegalArgumentException, IOException {
            Map<String, Integer> desc = new HashMap<>();
            desc.put("current", 0);
            desc.put("byteSize", 0);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    // TODO Auto-generated method stub
                    while (true) {
                        try {
                            Thread.sleep(500);
                            System.out.printf("maxL:%d	current:%d	surplus:%d
    ", desc.get("byteSize"),desc.get("current"),desc.get("byteSize")-desc.get("current"));
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
            fileUtils.download2("/hadoop-2.7.3.tar.gz",desc);
        }
        
        @Test
        public void testupload2() throws IllegalArgumentException, IOException {
            DecimalFormat df = new DecimalFormat("0.00%");
            
            Map<String, Long> desc = new HashMap<String, Long>();
            desc.put("current", 0l);
            desc.put("byteSize", 0l);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    // TODO Auto-generated method stub
                    while (true) {
                        try {
                            Thread.sleep(500);
                            System.out.printf("maxL:%d	current:%d	surplus:%d	progressBar:%s
    ", desc.get("byteSize"),desc.get("current"),desc.get("byteSize")-desc.get("current"),df.format((desc.get("current")+0.0)/desc.get("byteSize")));
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
            fileUtils.upload2("D:\hadoop\CentOS-7-x86_64-DVD-1511.iso", "/CentOS-7-x86_64-DVD-1511.iso",desc);
        }
        
    }
  • 相关阅读:
    array_count_values源码
    php 编译安装记录
    mysql 安装的过程做个记录
    初识highcharts 库
    php 不重新编译增加新扩展的方法
    备考PMP
    Beyond Compare4破解--写reg脚本删除注册表
    SourceTree 跳过登录
    正则 (?=exp)
    springmvc--处理器的返回参数
  • 原文地址:https://www.cnblogs.com/black-/p/8677743.html
Copyright © 2011-2022 走看看