zoukankan      html  css  js  c++  java
  • JAVA API操作-创建目录

    一、从hdfs下载文件到windows本地:
    
    
    package com.css.hdfs01;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    /**
     * 从hdfs下载文件到windows本地
     *    
     * 注意:
     * 1.需要配置hadoop环境变量
     * 2.需要编译好的winutils包
     */
    public class HdfsClientDemo02 {
        public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
            // 1.加载配置
            Configuration conf = new Configuration();
            // 2.设置副本数
            conf.set("dfs.replication", "2");
            // 3.设置块大小
            conf.set("dfs.blocksize", "64m");
            // 4.构造客户端
            FileSystem fs = FileSystem.get(new URI("hdfs://192.168.146.132:9000"), conf, "root");
            // 5.hdfs数据下载到windows本地
            fs.copyToLocalFile(new Path("/hdfs-site.xml"), new Path("c:/"));
            // 6.关闭资源
            fs.close();
        }
    }
    
    二、hdfs常用的API:
    
    
    package com.css.hdfs02;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import java.util.Arrays;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.LocatedFileStatus;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.RemoteIterator;
    import org.junit.Before;
    import org.junit.Test;
    
    /**
     * hdfs常用的API
     */
    public class HdfsClientTest {
        
        FileSystem fs =null;
        
        @Before
        public  void init() throws IOException, InterruptedException, URISyntaxException {
            // 1.加载配置
            Configuration conf = new Configuration();
            // 2.设置副本数
            conf.set("dfs.replication", "2");
            // 3.设置块大小
            conf.set("dfs.blocksize", "64m");
            // 4.构造客户端
            fs = FileSystem.get(new URI("hdfs://192.168.146.132:9000/"), conf, "root");
        }
        
        /**
         * 在hdfs中创建文件夹
         * hdfs dfs -mkdir /文件夹名
         */
        @Test
        public void hdfsMkdir() throws IllegalArgumentException, IOException{
            // 1.调用创建文件夹方法
            fs.mkdirs(new Path("/hello"));
            // 2.关闭资源
            fs.close();
        }
        
        /**
         * 在hdfs中 移动/修改文件
         * hdfs dfs -mv /hdfs路径 /hdfs路径
         * hdfs dfs -cp /hdfs路径 /hdfs路径        
         */
        @Test
        public void hdfsRename() throws IllegalArgumentException, IOException{
            // 1.调用移动修改文件方法
            fs.rename(new Path("/aa.txt"), new Path("/hello/aa.txt"));
            // 2.关闭资源
            fs.close();
        }
        
        /**
         * 在hdfs中 删除文件/文件夹
         * hdfs dfs -rm /文件名
         * hdfs dfs -rm -r /文件夹名
         */
        @Test
        public void hdfsRm() throws IllegalArgumentException, IOException{
            // 1.调用删除文件方法
            // 下面的一个参数的方法已弃用
            // fs.delete(new Path("/aaaa.txt"));
            // 参数1:要删除的路径  参数2:是否递归删除
            fs.delete(new Path("/aaa111.txt"), true);
            // 2.关闭资源
            fs.close();
        }
        
        /**
         * 查询hdfs下指定的目录信息
         */
        @Test
        public void hdfsLs() throws IllegalArgumentException, IOException{
            // 1.调用方法,返回远程迭代器    
            RemoteIterator<LocatedFileStatus> iter = fs.listFiles(new Path("/"), true);
            // 2.取迭代器数据
            while (iter.hasNext()) {
                // 拿数据
                LocatedFileStatus status = iter.next();
                System.out.println("文件的路径为:" + status.getPath());
                System.out.println("块大小为:" + status.getBlockSize());
                System.out.println("文件长度为:" + status.getLen());
                System.out.println("副本数量为:" + status.getReplication());
                System.out.println("块信息为:" + Arrays.toString(status.getBlockLocations()));
                System.out.println("===============================");
            }
            // 3.关闭资源
            fs.close();
        }
        
        /**
         * 判断文件还是文件夹
         */
        @Test
        public void hdfsFile() throws IllegalArgumentException, IOException{
            // 1.展示状态信息
            FileStatus[] listStatus = fs.listStatus(new Path("/"));
            // 2.遍历所有文件
            for(FileStatus ls:listStatus){
                if (ls.isFile()) {
                    // 文件
                    System.out.println("文件-----f-----" + ls.getPath().getName());
                }else {
                    // 文件夹
                    System.out.println("文件夹-----d-----" + ls.getPath().getName());
                }
            }
        }    
    }
    
    三、hdfs读写文件:
    
    
    package com.css.hdfs03;
    
    import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.net.URISyntaxException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.junit.Before;
    import org.junit.Test;
    
    /**
     * hdfs读写文件
     */
    public class ReadData {
        
        FileSystem fs =null;
        @Before
        public  void init() throws IOException, InterruptedException, URISyntaxException {
            // 1.加载配置
            Configuration conf = new Configuration();
            // 2.构造客户端
            fs = FileSystem.get(new URI("hdfs://192.168.146.132:9000/"), conf, "root");
        }
        
        /**
         * 读数据方式一
         */
        @Test
        public void testReadData1() throws IllegalArgumentException, IOException{
            // 1.拿到流
            FSDataInputStream in = fs.open(new Path("/a.txt"));
            byte[] buf = new byte[1024];
            in.read(buf);
            System.out.println(new String(buf));
            // 2.关闭资源
            in.close();
            fs.close();
        }
        
        /**
         * 读数据方式二
         */
        @Test
        public void testReadData2() throws IllegalArgumentException, IOException{
            // 1.拿到流
            FSDataInputStream in = fs.open(new Path("/hdfs-site.xml"));
            // 2.缓冲流
            BufferedReader br = new BufferedReader(new InputStreamReader(in, "UTF-8"));
            // 3.按行读取
            String line = null;
            // 4.读数据
            while ((line = br.readLine()) != null) {
                System.out.println(line);
            }
            // 5.关闭资源
            br.close();
            in.close();
            fs.close();
        }
        
        /**
         * 读取hdfs中指定偏移量
         */
        @Test
        public void testRandomRead() throws IllegalArgumentException, IOException{
            // 1.拿到流
            FSDataInputStream in = fs.open(new Path("/hdfs-site.xml"));
            // 2.移动文件读取指针到指定位置
            in.seek(14);
            byte[] b = new byte[5];
            // 3.从指针位置开始读取数组b的长度个字节
            in.read(b);
            System.out.println(new String(b));
            // 4.关闭资源
            in.close();
        }
        
        /**
         * 在hdfs中写数据方式一
         */
        @Test
        public void testWriteData() throws IllegalArgumentException, IOException{
            // 1.输出流
            FSDataOutputStream out = fs.create(new Path("/windows.txt"), false);
            // 2.输入流
            FileInputStream in = new FileInputStream("C:\Users\Administrator\Desktop\1012.txt");
            byte[] buf = new byte[1024];
            int read = 0;
            while ((read = in.read(buf)) != -1) {
                out.write(buf, 0, read);
            }
            // 3.关闭资源
            in.close();
            out.close();
            fs.close();
        }
        
        /**
         * 在hdfs中写数据方式二
         */
        @Test
        public void testWriteData1() throws IllegalArgumentException, IOException{
            // 1.创建输出流
            FSDataOutputStream out = fs.create(new Path("/love"));
            // 2.写数据
            out.write("Areyouokmylove".getBytes());
            // 3.关闭资源
            IOUtils.closeStream(out);
            fs.close();
        }
    }
    View Code

    JAVA API操作-创建目录

    package com.hdfs;
    
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    import org.junit.Before;
    import org.junit.Test;
    
    public class HdfsTest {
        
        private FileSystem fs = null;
        
        @Before
        public void init() throws Exception {
            fs = FileSystem.get(new URI("hdfs://192.168.119.128:9000"),
                    new Configuration(),"root");
            
        }
        @Test
        public void testMkdir() throws Exception{
            boolean flag = fs.mkdirs(new Path("/javaApi/mk/dir1/dir2"));
            System.out.println(flag ? "创建成功":"创建失败");
        }
    }
     
    import java.io.File;
    import java.io.IOException;
    import java.net.URI;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
     
    import org.apache.commons.logging.Log;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.LocatedFileStatus;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.RemoteIterator;
    import org.apache.hadoop.hdfs.DistributedFileSystem;
    import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
    import org.apache.hadoop.hdfs.server.datanode.DataNode;
    import org.codehaus.jettison.json.JSONException;
    import org.codehaus.jettison.json.JSONObject;
    import org.json.JSONArray;
    import org.slf4j.*;
     
    import com.shidai.hadoop.utils.Constant;
    import com.shidai.hadoop.utils.DateUtil;
     
    public class HDFSTest {
     
        private static String url = Constant.url;
        private static Configuration conf = new Configuration();
     
        public static void getAllDataNode() {
     
            try {
                FileSystem fs = FileSystem.get(conf);
                DistributedFileSystem distributedfs = (DistributedFileSystem) fs;
                DatanodeInfo[] datanodeInfos = distributedfs.getDataNodeStats();
                for (int i = 0; i < datanodeInfos.length; i++) {
                    System.out.println("第" + i + "个datanode:" + datanodeInfos[i].getHostName());
                }
     
            } catch (IOException e) {
                e.printStackTrace();
            }
     
        }
     
        /**
         * 创建文件,并写入内容
         * 
         * @param dst
         * @param contents
         * @throws IOException
         */
        public static void createFile(String dst, byte[] contents) throws IOException {
     
            FileSystem fs = FileSystem.get(URI.create(url), conf);
            Path path = new Path(dst);
            FSDataOutputStream out = fs.create(path);
            out.write(contents);
            out.close();
            fs.close();
            System.out.println("创建文件成功");
     
        }
     
        /**
         * 读取文件
         * 
         * @param dst
         * @throws JSONException
         * @throws ParseException
         */
        public static void readFile(String dst) throws JSONException, ParseException {
     
            FileSystem fs;
            FSDataInputStream in;
            try {
                fs = FileSystem.get(URI.create(url), conf);
                in = fs.open(new Path(dst));
                byte[] ioBuffer = new byte[1024];
                StringBuffer sf = new StringBuffer();
                int len = -1;
     
                while ((len = in.read(ioBuffer)) != -1) {
                    // System.out.write(ioBuffer,0,len);;
                    String string = new String(ioBuffer, 0, len);
                    sf.append(string);
                    len = in.read(ioBuffer);
     
                }
                in.close();
                fs.close();
     
                System.out.println(sf.toString());
                JSONObject json = new JSONObject(sf.toString());
                Long time = json.getLong("last_time");
                String sd = DateUtil.getDate(time * 1000);
     
                System.out.println("上传时间:" + sd);
     
            } catch (IOException e) {
                e.printStackTrace();
            }
     
        }
     
        /**
         * 遍历文件
         * 
         * @param dst
         */
        public static void listFiles(String dst) {
            FileSystem fs = null;
            try {
                fs = FileSystem.get(URI.create(url), conf);
     
                RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(dst), false);
                while (iterator.hasNext()) {
     
                    LocatedFileStatus locatedFileStatus = iterator.next();
                    if (locatedFileStatus.isFile()) {
                        String path = locatedFileStatus.getPath().toString();
                        System.out.println(path);
                        if (!path.endsWith("tmp")) {
                            readFile(path);
                        }
     
                    }
                }
     
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
     
        /**
         * 上传文件
         * 
         * @param src
         * @param dst
         */
        public static void upload(String src, String dst) {
     
            FileSystem fs = null;
            try {
                fs = FileSystem.get(URI.create(url), conf);
                Path srcPath = new Path(src);
                Path dstPath = new Path(dst);
                fs.copyFromLocalFile(false, srcPath, dstPath);
                // 打印文件路径
     
                System.out.println("list files");
                FileStatus[] fileStatus = fs.listStatus(dstPath);
                for (FileStatus fstatus : fileStatus) {
                    System.out.println(fstatus.getPath());
                }
     
            } catch (IOException e) {
     
                e.printStackTrace();
     
            } finally {
     
                if (fs != null) {
                    try {
                        fs.close();
                    } catch (IOException e) {
     
                        e.printStackTrace();
     
                    }
     
                }
            }
     
        }
     
        /**
         * 删除文件
         * 
         * @param args
         * @throws JSONException
         * @throws ParseException
         */
        public static void delete(String dst) {
     
            FileSystem fs = null;
            try {
                fs = FileSystem.get(URI.create(url), conf);
                Boolean flag = fs.delete(new Path(dst), false);
                if (flag) {
                    System.out.println("删除成功");
                } else {
                    System.out.println("删除失败");
                }
            } catch (IOException e) {
                e.printStackTrace();
     
            }
     
        }
     
        public static void main(String[] args) throws JSONException, ParseException {
     
            System.setProperty("hadoop.home.dir", "C:/Users/root/.m2/repository/org/apache/hadoop/hadoop-common/2.5.2");
            byte[] contents = "明月几时有...
    ".getBytes();
            /*
             * try{ // createFile("/user/hadoop/test/hdfs01.txt", contents);
             * }catch(IOException e){ e.printStackTrace(); }
             */
     
            // getAllDataNode();
            // upload("F:/yun/svn/1.txt", "/user/root/");
            // 读文件
            // readFile("/flume/data/FlumeData.1469543705102");
            // 遍历文件
            // listFiles("/flume/");
     
            // 删除文件
            delete("/user/root/test");
     
        }
     
    }
    View Code
  • 相关阅读:
    Calculate every items’ quantity for each transaction type
    FW: Introduction to Change Data Capture (CDC) in SQL Server 2008
    过滤数据源,只显示那些符合条件的数据
    首日封, DAX
    X++中的subStr
    How to Compare two layer’s object: Get AOT objects both modified in USR & SL1 layer
    AX: grid中添加checkbox, edit 方法
    FW: Loading XML Data into SQL Server (SQL Spackle)
    How to get Record count?

  • 原文地址:https://www.cnblogs.com/cainiao-chuanqi/p/12712111.html
Copyright © 2011-2022 走看看