zoukankan      html  css  js  c++  java
  • Java读写HDFS文件

    一、依赖包maven路径

    
    
    1. <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
    2. <dependency>
    3.     <groupId>org.apache.hadoop</groupId>
    4.     <artifactId>hadoop-client</artifactId>
    5.     <version>2.7.3</version>
    6.     <scope>runtime</scope>
    7. </dependency>

    二、针对HDFS文件的操作类HDFSOperate

    
    
    1. package com.hdfs.util;
    2.  
    3. import java.io.BufferedReader;
    4. import java.io.File;
    5. import java.io.FileOutputStream;
    6. import java.io.IOException;
    7. import java.io.InputStreamReader;
    8. import java.io.PrintStream;
    9. import java.net.URI;
    10.  
    11. import org.apache.hadoop.conf.Configuration;
    12. import org.apache.hadoop.fs.FSDataInputStream;
    13. import org.apache.hadoop.fs.FSDataOutputStream;
    14. import org.apache.hadoop.fs.FileSystem;
    15. import org.apache.hadoop.fs.Path;
    16. import org.apache.hadoop.io.IOUtils;
    17.  
    18. /**
    19.  * 针对HDFS文件的操作类
    20.  */
    21. public class HDFSOperate {
    22.  
    23. /**
    24.  * 新增(创建)HDFS文件
    25.  * @param hdfs
    26.  */
    27. public void createHDFS(String hdfs){
    28. try {
    29. Configuration conf = new Configuration();
    30. conf.setBoolean("dfs.support.append", true);  
    31. conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");
    32.         conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");
    33. FileSystem fs = FileSystem.get(URI.create(hdfs), conf);
    34. Path path = new Path(hdfs); 
    35. //判断HDFS文件是否存在
    36. if(fs.exists(path)){
    37. //System.out.println(hdfs + "已经存在!!!");
    38. }else{
    39. FSDataOutputStream hdfsOutStream = fs.create(new Path(hdfs));
    40. hdfsOutStream.close();
    41. }
    42. fs.close();
    43. } catch (Exception e) {
    44. // TODO: handle exception
    45. e.printStackTrace();
    46. }
    47. }
    48. /**
    49.  * 在HDFS文件后面追加内容
    50.  * @param hdfs
    51.  * @param appendContent
    52.  */
    53. public void appendHDFS(String hdfs,String appendContent){
    54. try {
    55. Configuration conf = new Configuration();
    56. conf.setBoolean("dfs.support.append", true);  
    57. conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");
    58.         conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");
    59. FileSystem fs = FileSystem.get(URI.create(hdfs), conf);
    60. Path path = new Path(hdfs);
    61. //判断HDFS文件是否存在
    62. if(fs.exists(path)){
    63. //System.out.println(hdfs + "已经存在!!!");
    64. }else{
    65. FSDataOutputStream hdfsOutStream = fs.create(new Path(hdfs));
    66. hdfsOutStream.close();
    67. }
    68. FSDataOutputStream hdfsOutStream = fs.append(new Path(hdfs));
    69. byte [] str = appendContent.getBytes("UTF-8");//防止中文乱码
    70. hdfsOutStream.write(str);
    71. hdfsOutStream.close();
    72. fs.close();
    73. } catch (Exception e) {
    74. // TODO: handle exception
    75. e.printStackTrace();
    76. }
    77. }
    78. /**
    79.  * 修改HDFS文件内容 /  删除就是替换为空
    80.  * @param hdfs : hdfs文件路径
    81.  * @param sourceContent :要修改的hdfs文件内容
    82.  * @param changeContent :需要修改成的文件内容
    83.  */
    84. public void change(String hdfs,String sourceContent,String changeContent){
    85. try {
    86. Configuration conf = new Configuration();
    87. conf.setBoolean("dfs.support.append", true);  
    88. conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");
    89.         conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");
    90. FileSystem fs = FileSystem.get(URI.create(hdfs), conf);
    91. Path path = new Path(hdfs);
    92. //判断HDFS文件是否存在
    93. if(fs.exists(path)){
    94. //System.out.println(hdfs + "已经存在!!!");
    95. FSDataInputStream in = fs.open(path);
    96. BufferedReader bf=new BufferedReader(new InputStreamReader(in));//防止中文乱码
    97. String totalString = "";
    98. String line = null;
    99. while ((line = bf.readLine()) != null) {
    100. totalString += line;
    101. }
    102. String changeString = totalString.replace(sourceContent, changeContent);
    103. FSDataOutputStream hdfsOutStream = fs.create(new Path(hdfs));
    104. byte [] str = changeString.getBytes("UTF-8");
    105. hdfsOutStream.write(str);
    106. hdfsOutStream.close();
    107. }else{
    108. //System.out.println(hdfs + "不存在,无需操作!!!");
    109. }
    110. fs.close();
    111. } catch (Exception e) {
    112. // TODO: handle exception
    113. e.printStackTrace();
    114. }
    115. }
    116. /**
    117.  * 判断要追加的内容是否存在
    118.  * @param hdfs
    119.  * @param appendContent
    120.  * @return
    121.  */
    122. public Boolean isContentExist(String hdfs,String appendContent){
    123. try {
    124. Configuration conf = new Configuration();
    125. conf.setBoolean("dfs.support.append", true);  
    126. conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");
    127.         conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");
    128. FileSystem fs = FileSystem.get(URI.create(hdfs), conf);
    129. Path path = new Path(hdfs);
    130. //判断HDFS文件是否存在
    131. if(fs.exists(path)){
    132. //System.out.println(hdfs + "已经存在!!!");
    133. FSDataInputStream in = fs.open(path);
    134. BufferedReader bf=new BufferedReader(new InputStreamReader(in));//防止中文乱码
    135. String totalString = "";
    136. String line = null;
    137. while ((line = bf.readLine()) != null) {
    138. totalString += line;
    139. }
    140. if(totalString.contains(appendContent)){
    141. return true;
    142. }
    143. }else{
    144. //System.out.println(hdfs + "不存在,无需操作!!!");
    145. }
    146. fs.close();
    147. } catch (Exception e) {
    148. // TODO: handle exception
    149. e.printStackTrace();
    150. }
    151. return false;
    152. }
    153. public static void main(String[] args) throws IOException {
    154. String hdfs = "hdfs://192.168.168.200:9000/test/tes.txt";
    155. HDFSOperate hdfsOperate = new HDFSOperate();
    156. hdfsOperate.createHDFS(hdfs);
    157. hdfsOperate.appendHDFS(hdfs,"测试新增内容");
    158. //hdfsOperate.change(hdfs, "测试新增内容", "测试修改成功");
    159. }
    160. }
  • 相关阅读:
    HDU5090模拟,hash
    在Android手机上获取其它应用的包名及版本
    ubunut在系统恢复模式下无法改动rootpassword的分析和解决
    Index column size too large. The maximum column size is 767 bytes.
    java实现定时任务
    APP账号密码传输安全分析
    用RSA加密实现Web登录密码加密传输
    svn is already locked解决方案
    ajax跨域请求
    redis+spring配置
  • 原文地址:https://www.cnblogs.com/yangcx666/p/8723866.html
Copyright © 2011-2022 走看看