今天发布一个以前的想法,以前在做hadoop的文件上传的时候,当时老师说的是使用hadoop当作存储的类似MySQL的数据库,老师也说了这样做没必要,但也是让我们这样用,也就是将数据存到HDFS中实现信息管理系统,我这里做的是当时的想法,就是在HDFS中和MySQL一样创建几个文本文档当作数据表,然后每一行存信息,以此实现增删改查类似于MySQL,我当时也查找了一些文章,于是我当时按照我的理解将其封装成了方法使用,虽然最后老师又换了题目,但还是记录下:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.awt.*;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class HDFSMethod {
private static FileSystem fs;
private static void init() throws URISyntaxException, IOException, InterruptedException{
Configuration configuration = new Configuration();
configuration.set("dfs.replication","1");
// FileSystem fs = FileSystem.get(new
URI uri=new URI("hdfs://hadoop102:8020");
String user="wang";
fs = FileSystem.get(uri, configuration,user);
}
private static void close() throws IOException {
// 3 关闭资源
fs.close();
}
public void HDFSUpFile(String MyPath,String HdfsPath)throws IOException, URISyntaxException, InterruptedException{
init();
//参数1:是否删除本地的 参数二:是否覆盖已有同名的 参数三:原文件路径 参数四:目的路径
fs.copyFromLocalFile(false,true,new Path(MyPath),new Path(HdfsPath));
close();
}
public void HDFSInsert(String FPath,String oneitem)throws IOException, URISyntaxException, InterruptedException{
init();
Path path = new Path(FPath);
FSDataOutputStream fos = fs.append(path);
fos.write((oneitem+"\n").getBytes());
fos.close();
close();
}
//删除的方法在参数中在加入一个List的Bean类,然后循环执行写入的语句,遇到要删除的索引就不写入完成删除,把每一个Bean类都写一个对应的删除方法
//在参数中在加入一个Bean类用来存储数据减少读取次数
//在参数中在加入一个Bean类用来存储数据减少读取次数
//在参数中在加入一个Bean类用来存储数据减少读取次数
public void HDFSDelete(String FPath, int num)throws IOException, URISyntaxException, InterruptedException{
init();
Path path = new Path(FPath);
FSDataOutputStream fos = fs.create(path,true);
fos.write("待输入\n".getBytes());
fos.close();
close();
}
//修改的方法要在参数中再加入一个Bean类,然后循环执行写入的语句,遇到要修改的索引就是用第三个参数代表修改后的语句的,把每一个Bean类都写一个对应的修改方法
//在参数中在加入一个Bean类用来存储数据减少读取次数
//在参数中在加入一个Bean类用来存储数据减少读取次数
//在参数中在加入一个Bean类用来存储数据减少读取次数
public void HDFSChange(String FPath,int num,String ChangeStr)throws IOException, URISyntaxException, InterruptedException{
init();
Path path = new Path(FPath);
FSDataOutputStream fos = fs.create(path,true);
fos.write("待输入\n".getBytes());
fos.close();
close();
}
//查找所有的方法方法类型转换成Bean类,用来返回不同的读取,把每一个Bean类都写一个与之对应的查找所有的方法
public void HDFSFindAll(String FPath)throws IOException, URISyntaxException, InterruptedException{
init();
Path path = new Path(FPath);
FSDataInputStream fis = fs.open(path);
int len = 0;
byte[] buf = new byte[4096];
while ((len = fis.read(buf)) != -1){
System.out.println(new String(buf, 0, len));
}
fis.close();
close();
}
//查找某一条的方法根据需要在读取到每一行的时候进行判断,如果符合要求就将其加入到Bean的list中,把每一个精确查询的要求都写一个方法
public void HDFSSelect(String FPath,String need)throws IOException, URISyntaxException, InterruptedException{
init();
Path path = new Path(FPath);
FSDataInputStream fis = fs.open(path);
int len = 0;
byte[] buf = new byte[4096];
while ((len = fis.read(buf)) != -1){
System.out.println(new String(buf, 0, len));
}
fis.close();
close();
}
public void con()throws URISyntaxException, IOException, InterruptedException {
init();
close();
}
/*
init();
close();
*/
public List<PeopleBean> ShowAllPeople() throws URISyntaxException, IOException, InterruptedException {
List<PeopleBean> list=new ArrayList<>();
init();
Path path = new Path("/manage1/people.txt");
FSDataInputStream fis = fs.open(path);
BufferedReader d = new BufferedReader(new InputStreamReader(fis));
String line;
while ((line = d.readLine()) != null) {
PeopleBean bean=null;
String[] AS=line.split(" ");
bean=new PeopleBean(AS[0],AS[1],Integer.parseInt(AS[2]));
list.add(bean);
}
d.close();
fis.close();
close();
return list;
}
public void DelOnePeople(int num)throws URISyntaxException, IOException, InterruptedException {
List<PeopleBean> list=new ArrayList<>();
init();
Path path = new Path("/manage1/people.txt");
FSDataInputStream fis = fs.open(path);
BufferedReader d = new BufferedReader(new InputStreamReader(fis));
String line;
while ((line = d.readLine()) != null) {
PeopleBean bean=null;
String[] AS=line.split(" ");
bean=new PeopleBean(AS[0],AS[1],Integer.parseInt(AS[2]));
list.add(bean);
}
d.close();
fis.close();
list.remove(num);
FSDataOutputStream fos = fs.create(path,true);
for(int i=0;i<list.size();i++) {
fos.write((list.get(i).getName()+" "+list.get(i).getSex()+" "+list.get(i).getAge()+"\n").getBytes());
}
fos.close();
close();
}
}