zoukankan      html  css  js  c++  java
  • hadoop02

    1.默认文件操作系统修改

    • 在bin目录下:有个dfsadmin (hdfg dfsadmin xxx)是系统有关命令,集群管理命令。

      如安全模式的控制:比如集群中一半的存储副本挂掉,将启动安全模式,客户端无法启动。好比如集群刚刚启动时候,需要启动事件,此时控制客户端无法写入也是安全模式控制。可以通过dfsadmin手动解除安全模式。
      
    • 配置本机的客户端操作默认文件操作系统:etc/hadoop/core-site.xml

      <configuration>
      <!-- 本机的客户端操作默认文件操作系统 -->
      <property>
        <name>fs.defaultFS</name>
        <value>hdfs://linux01:9000/</value>
        <description></description>
      </property>
      </configuration>
      
      <!--其他配置:-->
      etc/hadoop/hdfs-site.xml 中dfs.replication 配置存储副本节点个数。dfs.blocksize 配置副本存储存储size
      
      <property>
        <name>fs.replication</name>
        <value>2</value>
        <description></description>
      </property>
      
      <property>
        <name>dfs.blocksize</name>
        <value>134217728</value>
        <description></description>
      </property>
      
    • 此时执行put命令:

      ./hdfs dfs -put /mnt/1.txt /
      # 意思是将/mnt/1.txt 文件上传hffs://linux01:9000/目录下
      

    2.HDFS一件启停配置

    • sbin下 有一个hadoop-daemon.sh 的shell命令

      可以通过
      	./hadoop-daemon.sh stop datanode/namenode 停止datenode或namenode
      	
      
    • 一键启动,关闭配置

      • etc/hadoop 有slaves配置启停主机名字(如在linux01配置如下信息)
      vim /opt/hdp/hadoop-2.8.5/etc/hadoop/slaves
      # 配置如下:
      linux01
      linux02
      linux03
      
      • 则在linux01下sbin目录下:
      # 启动:
      ./start-dfs.sh
      # 停止
      ./stop-dfs.sh 
      

    3.配置hadoop的环境变量

    • 显然上面操作过于繁琐,通过配置hadoop环境变量更加方便使用。

    • 配置hdfs启停环境变量和客户端环境变量

      vim /etc/profile
      export HADOOP_HOME=/opt/hdp/hadoop-2.8.5
      export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin
      source /etc/profile
      
    • 测试:

      # 一键启动
      start-dfs.sh
      # 查询linux01目录:
      [root@linux01 hadoop-2.8.5]# hdfs dfs -ls /
      Found 5 items
      -rw-r--r--   3 root supergroup         17 2020-11-03 00:31 /1.txt
      drwxr-xr-x   - root supergroup          0 2020-11-02 17:23 /field_test
      drwxr-xr-x   - root supergroup          0 2020-11-02 16:52 /myfield
      -rw-r--r--   3 root supergroup       2397 2020-11-01 17:26 /nginx.conf
      -rw-r--r--   3 root supergroup         21 2020-11-01 17:29 /test_hdp.txt
      
    • 常见shell命令:

      hadoop常见命令

    • 我们在HDFS进行原始数据存储,不会对它进行修改

    4.Java客户端操作

    • eclipse 配置使用:首先需要导入hadoop相关jar包。将lib/(lib里面包含hadoop相关jar包)下jar包放到工程目录下(src平级)全选jar包,右键点击Build Path即可。然后创建package 和.java,.java书写代码
    package hadoop.client;
    
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    /*
     * 使用java HDFS的系统
     * 1.URI 指定namenode位置。
     * 2.用户信息。
     * 3.用户自定义参数设置。
     * 4.导入hadoop架包
     * */
    
    public class ClientDemo1 {
    	
    	public static void main(String[] args) throws Exception {
    		URI uri = new URI("hdfs://linux01:9000/");
    		// 用户自定义配置对象,默认加载配置文件hdfs-default.xml和core-site.xml文件
    		// 配置了core-site.xml和hdfs-site.xml 会覆盖默认
    		// 还会加载本项目xml文件
    		Configuration conf = new Configuration();//作用修改配置信息:文件个数,切块大小等
    		// 获取操作HDFS文件系统客户端对象
    		FileSystem fs = FileSystem.get(uri, conf, "root");
    		// 本地路径
    		Path src = new Path("D:\result.png");
    		// hdfs系统路径
    		Path dst = new Path("/");
    		// 本地文件上传hdfs路径
    		fs.copyFromLocalFile(src, dst);
    		fs.close();
    	}
    }
    
    • 通过自定义配置文件上传副本
    // 方式1:添加配置文件
    // eclipse  中在项目目录下右键 new->Source Folder 创建config文件,并创建hdfs-site.xml 填写如下内容 意思是存储四个文件副本。
    
    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <configuration>
    <property>
      <name>dfs.replication</name>
      <value>4</value>
      <description></description>
    </property>
    </configuration>
    
    // 方式2:代码中添加
    Configuration conf = new Configuration();
    // 每个存储副本块设置64M
    conf.set("dfs.blocksize", "64M");
    // 保存副本设置为2个
    conf.set("dfs.replication", "2");
    

    5.Java客户端测试

    • 这里记录一个关于书写测试代码,带来initializationError错误
    1.单元测试方法应该实例方法,不应该出现静态方法,将static关键字去掉。
    2.单元测试方法返回void.  将返回值修改为void
    3.如果还有报错下载hamcrest-core.jar包。并引入。可进入下面地址下载
    

    hamcrest-core.jar

    • 示例:
    package _51doit.client;
    import java.io.BufferedReader;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.BlockLocation;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.LocatedFileStatus;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.RemoteIterator;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    
    public class TestClient {
    	URI uri = null;
    	FileSystem  fs = null;
    	
    	@Before
    	public void init() throws Exception {
    		uri = new URI("hdfs://linux01:9000/");
    		Configuration conf = new Configuration();
    		fs = FileSystem.get(uri, conf, "root");
    	}
    	// 上传
    	@Test
    	public void upload( ) throws IllegalArgumentException, IOException {
    		// 参数1:是否删除原文件
    		// 参数2:是否覆盖
    		// 参数3:源文件
    		// 参数4:目标路径
    		fs.copyFromLocalFile(true, true, new Path("G:\tuniu_test.zip"), new Path("/"));
    	}
    	// 从hdfs下载:
    	@Test
    	public void downLoad() throws IllegalArgumentException, IOException {
    		// 参数1: hdfs文件
    		// 参数2:本地的路径
    		fs.copyToLocalFile(new Path("/tuniu_test.zip"), new Path("D:/"));
    		// 参数1:是否删除hdfs文件
    		// 参数4: 是否生成校验文件
    		fs.copyToLocalFile(true, new Path("/tuniu_test.zip"), new Path("e:/"), true);
    	}
    	// 删除
    	@Test
    	public void delete() throws IllegalArgumentException, IOException {
    		// 删除文件
    		// fs.delete(new Path("/tuniu_test.zip"));
    		// 删除文件夹
    		fs.delete(new Path("/test"), true);
    	}
    	// 重命名
    	@Test
    	public void rename() throws IllegalArgumentException, IOException {
    		fs.rename(new Path("/test_hdp.txt"), new Path("/test_hdp2.txt"));
    	}
    	// 检测一个文件或文件夹是否存在
    	@Test
    	public void isexists() throws IllegalArgumentException, IOException {
    		// 检测文件是否存在
    		// boolean b = fs.exists(new Path("/test_hdp.txt"));
    		// 检测文件夹是否存在
    		boolean b = fs.exists(new Path("/myfield"));
    		System.out.println(b);
    	}
    	
    	@Test
    	public void filelist() throws IllegalArgumentException, IOException {
    		// 判断hdfs根目录下所有文件
    		RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), false);
    		while (listFiles.hasNext()) {
    			LocatedFileStatus ls = listFiles.next();
    			Path path = ls.getPath();
    			// 获取文件路经
    			System.out.println(path);
    			// 获取文件名字
    			String filename = path.getName();
    			// 获取文件块信息
    			BlockLocation[] blockLocations = ls.getBlockLocations();
    			for (BlockLocation blockLocation:blockLocations) {
    				// 打印信息: 起始位置、长度、副本存储在主机名
    				System.out.println(blockLocation);
    				blockLocation.getLength();// 获取长度
    				String[] hosts = blockLocation.getHosts();// 获取主机
    			}
    		}
    	}
    	// 列出文件和文件名
    	@Test
    	public void list() throws FileNotFoundException, IllegalArgumentException, IOException {
    		FileStatus[] listStatus = fs.listStatus(new Path("/"));
    		for (FileStatus fileStatus: listStatus) {
    			// 判断是否是文件夹
    			boolean directory = fileStatus.isDirectory();
    			if (directory) {
    				Path dir = fileStatus.getPath();
    				System.out.println("文件夹名字:" + dir);
    			} else {
    				Path file = fileStatus.getPath();
    				System.out.println("文件名字:" + file);
    				file.getName(); // 获取文件名
    				fileStatus.getAccessTime();// 最后访问时间
    				fileStatus.getModificationTime();// 最新修改时间
    				fileStatus.getGroup();//所属组
    				fileStatus.getPermission();// 权限
    				fileStatus.getReplication();// 文件副本个数
    				fileStatus.getBlockSize();//块的大小
    			}
    		}
    	}
    	// 读取文件
    	@Test
    	public void readData() throws IllegalArgumentException, IOException {
    		FSDataInputStream fis = fs.open(new Path("/a.txt"));
    		BufferedReader br = new BufferedReader(new InputStreamReader(fis));
    		String line = br.readLine();
    		System.out.println(line);
    //		byte[] b = new byte[1024];
    //		int len = fis.read(b);
    //		System.out.println(new String(b,0,len));
    		fis.close();
    	}
    	// 写入追加
    	@Test
    	public void write() throws IllegalArgumentException, IOException {
    		FSDataOutputStream fout = fs.append(new Path("/a.txt"));
    		fout.write("good bye".getBytes());
    		fout.flush();
    		fout.close();
    	}
    	// 写入覆盖
    	@Test
    	public void write2() throws IllegalArgumentException, IOException {
    		FSDataOutputStream fout = fs.create(new Path("/a.txt"));
    		fout.write("good bye".getBytes());
    		fout.flush();
    		fout.close();
    	}
    	
    	
    	@After
    	public void close() throws IOException {
    		fs.close();
    	}
    }
    
    

    6.datanode和namenode交互

    • 工作机制

      datanode启动过程中,在自己配置中找namenode去索要一个版本(request.Version),在datanode会有一个Version.datanode然后回向namenode注册。而namenode会检查这个版本(检查不通过不会注册成功),注册完后datanode每小时会向namenode汇报数据,而namenode会构建映射(比如:存储节点id,起始位置,结束位置等),与此同时datanode会向namenode发送心跳检测(sendBeatHeart,目的是为了证明当前datanode工作正常,并且将一些datanode的curd操作放入队列,每次发送心跳时候namenode会返回操作指令(dataCmd指令),此时datanode做出相应的curd操作),当30秒都没有心跳检测,namenode会向datanode进行检查(5分钟一次),当10分钟之后没有响应,namenode会判定为宕机,并剔除datanode  ,然后namenode对该副本节点存储数据进行快数据复制,并存储到另一台datanode节点上。
      

    7.文件上传流程

    • 本地有一个a.txt文件(大小200M),现在请求上传hdfs,此时首先请求发送namenode,namenode进行检验并响应允许上传。然后请求上传第一块数据,然后namenode构建源数据:分配一个唯一的block的id并分配储存大小,存储副本节点,并进行响应源数据信息(blk_01, 128M linux01,linux02,linux03)。然后本地客户端请求建立连接通道(请求建立通道:linux01,linux02,linux03),会逐个向linux01,linux02,linux03建立,然后逐级响应建立成功。然后形成本地流,交给分布式流封装成packet传输到linux01,linux接收到分布式流,以本地流形式写入,边写的过程中 交给分布式流封装成packet,并发送给linux02,linux02再以本地流写入,然后以相同方式发送linux03.待上传文件完成。汇报给namenode上传完毕,第二块数据...以相同方式上传。待所有数据上传完毕。

    8.文件下载流程

    • namenode元数据对外提供虚拟目录,客户端请求下载a.txt,根据源数据请求linux01下载block01数据,linux01以分布式流封装packet发送到客户端,本地流将数据写到磁盘。
  • 相关阅读:
    SPOJ NDIV
    SPOJ ETF
    SPOJ DIVSUM
    头文件--持续更新
    SPOJ FRQPRIME
    SPOJ FUNPROB
    SPOJ HAMSTER1
    观光
    最短路计数
    拯救大兵瑞恩
  • 原文地址:https://www.cnblogs.com/xujunkai/p/13934669.html
Copyright © 2011-2022 走看看