zoukankan      html  css  js  c++  java
  • 利用HDFS java API增删改查操作

    利用HDFS java API增删改查操作

    在做这个实验的时候需要特别注意下面三个问题:

    1、hdfs安全模式需要关闭命令:./hadoop dfsadmin -safemode leave

    2、工程中依赖的版本必须和集群的一致,否则也会报 version不一致错误

    3、hadoop集群用户权限的问题,以及各个目录的作用

    目前为什么会有这三个问题的原因待查!!!

    未验证目前使用hadoop的版本(release-0.20.0)是否支持webhdfs,反正我是怎么都连接不上啊!!!

    从这上面看,0.20.0 可能是不支持的

    https://jira.springsource.org/browse/IMPALA-15?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

    Serengeti Distro:
    Apache Hadoop:1.0.1
    GreenPlum HD:1.1(Apache Hadoop 1.0.0)
    CloudEra: CDH3(Apache Hadoop 0.20.2, WebHDFS is not supported in this version)
    Hortonworks: 1.0.7 (Apache Hadoop 1.0.2)

    步骤如下:

    工程结构,如图:

    工程结构

    工程结构

    上代码 O(∩_∩)O哈哈~

    pom.xml配置如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    4.0.0
    com.yun.hdfs
    hdfs
    0.0.1-SNAPSHOT
    UTF-8
    maven-assembly-plugin
    false
    jar-with-dependencies
    com.yun.hdfs.WangPan
    make-assembly
    package
    assembly
    org.apache.hadoop
    hadoop-core
    0.20.2
    jar
    compile

    WangPan.java 主方法用于调用:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    package com.yun.hdfs;
    import java.io.IOException;
    public class WangPan {
    private static String result = "";
    public static void main(String[] args) {
    try {
    // 判断命令输入是否正确
    if (args[0] != null && !"".equals(args[0]) && args.length > 0) {
    if ("upload".equals(args[0])) {
    result = "upload:" + WangPanUtils.uploadFile(args);
    } else if ("delete".equals(args[0])) {
    result = "delete:" + WangPanUtils.deleteFile(args);
    } else if ("query".equals(args[0])) {
    if (WangPanUtils.listFile(args) == null) {
    result = "query:fail!";
    } else {
    result = "query:success";
    }
    } else if ("read".equals(args[0])) {
    result = "read:" + WangPanUtils.readFile(args);
    } else {
    System.out.println("sorry,wo have no this service!");
    }
    System.out.println(result);
    } else {
    System.out.println("fail!");
    System.exit(1);
    }
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }

    WangPanUtils.java增删改查:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    package com.yun.hdfs;
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.URI;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.FileUtil;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    public class WangPanUtils {
    public static String uploadFile(String[] args) throws IOException {
    String loaclSrc = args[1];
    String dst = args[2];
    if (args.length < 3) {
    return "fail";
    }
    InputStream in = new BufferedInputStream(new FileInputStream(loaclSrc));
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(dst), conf);
    OutputStream out = fs.create(new Path(dst));
    IOUtils.copyBytes(in, out, 4096, true);
    return "success";
    }
    public static Path[] listFile(String[] args) throws IOException {
    if (args.length < 2) {
    return null;
    }
    String dst = args[1];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(dst), conf);
    FileStatus[] statu = fs.listStatus(new Path(dst));
    Path[] listPaths = FileUtil.stat2Paths(statu);
    return listPaths;
    }
    public static String deleteFile(String[] args) throws IOException {
    if (args.length < 2) {
    return "fail";
    }
    String fileName = args[1];
    Configuration config = new Configuration();
    FileSystem hdfs = FileSystem.get(URI.create(fileName), config);
    Path path = new Path(fileName);
    if (!hdfs.exists(path)) {
    return "fail";
    }
    boolean isDeleted = hdfs.delete(path, false);
    if (isDeleted) {
    return "success";
    } else {
    return "fail";
    }
    }
    public static String readFile(String[] args) throws IOException {
    if(args.length < 3){
    return "fail";
    }
    String dst = args[1];
    String newPath = args[2];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(dst), conf);
    FSDataInputStream hdfsInStream = fs.open(new Path(dst));
    OutputStream out = new FileOutputStream(newPath);
    byte[] ioBuffer = new byte[1024];
    int readLen = hdfsInStream.read(ioBuffer);
    while (-1 != readLen) {
    out.write(ioBuffer, 0, readLen);
    readLen = hdfsInStream.read(ioBuffer);
    }
    out.close();
    hdfsInStream.close();
    fs.close();
    return "success";
    }
    }
    public static String mkdir(String[] args) throws IOException{
    if(args.length < 2){
    return "fali";
    }
    String dst = args[1];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(dst), conf);
    Path path = new Path(dst);
    if (fs.exists(path)) {
    return "fail";
    }
    fs.mkdirs(path);
    return "success";
    }

    PS:需要注意的是,我们需要把这个工程利用maven打包成一个可运行的jar包,使用如下命令:

    打包命令

    打包命令

    执行命令在每个方法注释上写明了,执行效果如下:

    增删改查效果

    增删改查效果

    还需要访问 http://hadoopm:50070/ -> Browse the filesystem 查看hdfs文件操作是否真的成功

    web hdfs

    web hdfs

  • 相关阅读:
    Gin框架结合gorm实现mysql增删改查
    Gin框架安装使用
    Golang常用排序算法比较
    Golang获取时间戳及格式化
    Golang使用goroutine交替打印序列
    Golang基础编程(六)-并发编程
    Golang基础编程(五)-指针
    Golang基础编程(四)-Map(集合)、Slice(切片)、Range
    模块化前端开发入门指南(三)
    模块化前端开发入门指南(二)
  • 原文地址:https://www.cnblogs.com/jiangye/p/3506040.html
Copyright © 2011-2022 走看看