1.默认文件操作系统修改
-
在bin目录下:有个dfsadmin (hdfg dfsadmin xxx)是系统有关命令,集群管理命令。
如安全模式的控制:比如集群中一半的存储副本挂掉,将启动安全模式,客户端无法启动。好比如集群刚刚启动时候,需要启动事件,此时控制客户端无法写入也是安全模式控制。可以通过dfsadmin手动解除安全模式。
-
配置本机的客户端操作默认文件操作系统:etc/hadoop/core-site.xml
<configuration> <!-- 本机的客户端操作默认文件操作系统 --> <property> <name>fs.defaultFS</name> <value>hdfs://linux01:9000/</value> <description></description> </property> </configuration> <!--其他配置:--> etc/hadoop/hdfs-site.xml 中dfs.replication 配置存储副本节点个数。dfs.blocksize 配置副本存储存储size <property> <name>fs.replication</name> <value>2</value> <description></description> </property> <property> <name>dfs.blocksize</name> <value>134217728</value> <description></description> </property>
-
此时执行put命令:
./hdfs dfs -put /mnt/1.txt / # 意思是将/mnt/1.txt 文件上传hffs://linux01:9000/目录下
2.HDFS一件启停配置
-
sbin下 有一个hadoop-daemon.sh 的shell命令
可以通过 ./hadoop-daemon.sh stop datanode/namenode 停止datenode或namenode
-
一键启动,关闭配置
- 在
etc/hadoop
有slaves配置启停主机名字(如在linux01配置如下信息)
vim /opt/hdp/hadoop-2.8.5/etc/hadoop/slaves # 配置如下: linux01 linux02 linux03
- 则在linux01下sbin目录下:
# 启动: ./start-dfs.sh # 停止 ./stop-dfs.sh
- 在
3.配置hadoop的环境变量
-
显然上面操作过于繁琐,通过配置hadoop环境变量更加方便使用。
-
配置hdfs启停环境变量和客户端环境变量
vim /etc/profile export HADOOP_HOME=/opt/hdp/hadoop-2.8.5 export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin source /etc/profile
-
测试:
# 一键启动 start-dfs.sh # 查询linux01目录: [root@linux01 hadoop-2.8.5]# hdfs dfs -ls / Found 5 items -rw-r--r-- 3 root supergroup 17 2020-11-03 00:31 /1.txt drwxr-xr-x - root supergroup 0 2020-11-02 17:23 /field_test drwxr-xr-x - root supergroup 0 2020-11-02 16:52 /myfield -rw-r--r-- 3 root supergroup 2397 2020-11-01 17:26 /nginx.conf -rw-r--r-- 3 root supergroup 21 2020-11-01 17:29 /test_hdp.txt
-
常见shell命令:
-
我们在HDFS进行原始数据存储,不会对它进行修改
4.Java客户端操作
- eclipse 配置使用:首先需要导入hadoop相关jar包。将lib/(lib里面包含hadoop相关jar包)下jar包放到工程目录下(src平级)全选jar包,右键点击
Build Path
即可。然后创建package 和.java,.java书写代码
package hadoop.client;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/*
* 使用java HDFS的系统
* 1.URI 指定namenode位置。
* 2.用户信息。
* 3.用户自定义参数设置。
* 4.导入hadoop架包
* */
public class ClientDemo1 {
public static void main(String[] args) throws Exception {
URI uri = new URI("hdfs://linux01:9000/");
// 用户自定义配置对象,默认加载配置文件hdfs-default.xml和core-site.xml文件
// 配置了core-site.xml和hdfs-site.xml 会覆盖默认
// 还会加载本项目xml文件
Configuration conf = new Configuration();//作用修改配置信息:文件个数,切块大小等
// 获取操作HDFS文件系统客户端对象
FileSystem fs = FileSystem.get(uri, conf, "root");
// 本地路径
Path src = new Path("D:\result.png");
// hdfs系统路径
Path dst = new Path("/");
// 本地文件上传hdfs路径
fs.copyFromLocalFile(src, dst);
fs.close();
}
}
- 通过自定义配置文件上传副本
// 方式1:添加配置文件
// eclipse 中在项目目录下右键 new->Source Folder 创建config文件,并创建hdfs-site.xml 填写如下内容 意思是存储四个文件副本。
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.replication</name>
<value>4</value>
<description></description>
</property>
</configuration>
// 方式2:代码中添加
Configuration conf = new Configuration();
// 每个存储副本块设置64M
conf.set("dfs.blocksize", "64M");
// 保存副本设置为2个
conf.set("dfs.replication", "2");
5.Java客户端测试
- 这里记录一个关于书写测试代码,带来initializationError错误
1.单元测试方法应该实例方法,不应该出现静态方法,将static关键字去掉。
2.单元测试方法返回void. 将返回值修改为void
3.如果还有报错下载hamcrest-core.jar包。并引入。可进入下面地址下载
- 示例:
package _51doit.client;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestClient {
URI uri = null;
FileSystem fs = null;
@Before
public void init() throws Exception {
uri = new URI("hdfs://linux01:9000/");
Configuration conf = new Configuration();
fs = FileSystem.get(uri, conf, "root");
}
// 上传
@Test
public void upload( ) throws IllegalArgumentException, IOException {
// 参数1:是否删除原文件
// 参数2:是否覆盖
// 参数3:源文件
// 参数4:目标路径
fs.copyFromLocalFile(true, true, new Path("G:\tuniu_test.zip"), new Path("/"));
}
// 从hdfs下载:
@Test
public void downLoad() throws IllegalArgumentException, IOException {
// 参数1: hdfs文件
// 参数2:本地的路径
fs.copyToLocalFile(new Path("/tuniu_test.zip"), new Path("D:/"));
// 参数1:是否删除hdfs文件
// 参数4: 是否生成校验文件
fs.copyToLocalFile(true, new Path("/tuniu_test.zip"), new Path("e:/"), true);
}
// 删除
@Test
public void delete() throws IllegalArgumentException, IOException {
// 删除文件
// fs.delete(new Path("/tuniu_test.zip"));
// 删除文件夹
fs.delete(new Path("/test"), true);
}
// 重命名
@Test
public void rename() throws IllegalArgumentException, IOException {
fs.rename(new Path("/test_hdp.txt"), new Path("/test_hdp2.txt"));
}
// 检测一个文件或文件夹是否存在
@Test
public void isexists() throws IllegalArgumentException, IOException {
// 检测文件是否存在
// boolean b = fs.exists(new Path("/test_hdp.txt"));
// 检测文件夹是否存在
boolean b = fs.exists(new Path("/myfield"));
System.out.println(b);
}
@Test
public void filelist() throws IllegalArgumentException, IOException {
// 判断hdfs根目录下所有文件
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), false);
while (listFiles.hasNext()) {
LocatedFileStatus ls = listFiles.next();
Path path = ls.getPath();
// 获取文件路经
System.out.println(path);
// 获取文件名字
String filename = path.getName();
// 获取文件块信息
BlockLocation[] blockLocations = ls.getBlockLocations();
for (BlockLocation blockLocation:blockLocations) {
// 打印信息: 起始位置、长度、副本存储在主机名
System.out.println(blockLocation);
blockLocation.getLength();// 获取长度
String[] hosts = blockLocation.getHosts();// 获取主机
}
}
}
// 列出文件和文件名
@Test
public void list() throws FileNotFoundException, IllegalArgumentException, IOException {
FileStatus[] listStatus = fs.listStatus(new Path("/"));
for (FileStatus fileStatus: listStatus) {
// 判断是否是文件夹
boolean directory = fileStatus.isDirectory();
if (directory) {
Path dir = fileStatus.getPath();
System.out.println("文件夹名字:" + dir);
} else {
Path file = fileStatus.getPath();
System.out.println("文件名字:" + file);
file.getName(); // 获取文件名
fileStatus.getAccessTime();// 最后访问时间
fileStatus.getModificationTime();// 最新修改时间
fileStatus.getGroup();//所属组
fileStatus.getPermission();// 权限
fileStatus.getReplication();// 文件副本个数
fileStatus.getBlockSize();//块的大小
}
}
}
// 读取文件
@Test
public void readData() throws IllegalArgumentException, IOException {
FSDataInputStream fis = fs.open(new Path("/a.txt"));
BufferedReader br = new BufferedReader(new InputStreamReader(fis));
String line = br.readLine();
System.out.println(line);
// byte[] b = new byte[1024];
// int len = fis.read(b);
// System.out.println(new String(b,0,len));
fis.close();
}
// 写入追加
@Test
public void write() throws IllegalArgumentException, IOException {
FSDataOutputStream fout = fs.append(new Path("/a.txt"));
fout.write("good bye".getBytes());
fout.flush();
fout.close();
}
// 写入覆盖
@Test
public void write2() throws IllegalArgumentException, IOException {
FSDataOutputStream fout = fs.create(new Path("/a.txt"));
fout.write("good bye".getBytes());
fout.flush();
fout.close();
}
@After
public void close() throws IOException {
fs.close();
}
}
6.datanode和namenode交互
-
工作机制
datanode启动过程中,在自己配置中找namenode去索要一个版本(request.Version),在datanode会有一个Version.datanode然后回向namenode注册。而namenode会检查这个版本(检查不通过不会注册成功),注册完后datanode每小时会向namenode汇报数据,而namenode会构建映射(比如:存储节点id,起始位置,结束位置等),与此同时datanode会向namenode发送心跳检测(sendBeatHeart,目的是为了证明当前datanode工作正常,并且将一些datanode的curd操作放入队列,每次发送心跳时候namenode会返回操作指令(dataCmd指令),此时datanode做出相应的curd操作),当30秒都没有心跳检测,namenode会向datanode进行检查(5分钟一次),当10分钟之后没有响应,namenode会判定为宕机,并剔除datanode ,然后namenode对该副本节点存储数据进行快数据复制,并存储到另一台datanode节点上。
7.文件上传流程
- 本地有一个a.txt文件(大小200M),现在请求上传hdfs,此时首先请求发送namenode,namenode进行检验并响应允许上传。然后请求上传第一块数据,然后namenode构建源数据:分配一个唯一的block的id并分配储存大小,存储副本节点,并进行响应源数据信息(blk_01, 128M linux01,linux02,linux03)。然后本地客户端请求建立连接通道(请求建立通道:linux01,linux02,linux03),会逐个向linux01,linux02,linux03建立,然后逐级响应建立成功。然后形成本地流,交给分布式流封装成packet传输到linux01,linux接收到分布式流,以本地流形式写入,边写的过程中 交给分布式流封装成packet,并发送给linux02,linux02再以本地流写入,然后以相同方式发送linux03.待上传文件完成。汇报给namenode上传完毕,第二块数据...以相同方式上传。待所有数据上传完毕。
8.文件下载流程
- namenode元数据对外提供虚拟目录,客户端请求下载a.txt,根据源数据请求linux01下载block01数据,linux01以分布式流封装packet发送到客户端,本地流将数据写到磁盘。