zoukankan      html  css  js  c++  java
  • Hadoop之HDFS原理及文件上传下载源码分析(上)

    HDFS原理

      首先说明下,hadoop的各种搭建方式不再介绍,相信各位玩hadoop的同学随便都能搭出来。

      楼主的环境:

    •   操作系统:Ubuntu 15.10
    •   hadoop版本:2.7.3
    •   HA:否(随便搭了个伪分布式)

    文件上传

    下图描述了Client向HDFS上传一个200M大小的日志文件的大致过程:

      首先,Client发起文件上传请求,即通过RPC与NameNode建立通讯。

      NameNode与各DataNode使用心跳机制来获取DataNode信息。NameNode收到Client请求后,获取DataNode信息,并将可存储文件的节点信息返回给Client。

      Client收到NameNode返回的信息,与对应的DataNode节点取得联系,并向该节点写文件。

      文件写入到DataNode后,以流水线的方式复制到其他DataNode(当然,这里面也有DataNode向NameNode申请block,这里不详细介绍),至于复制多少份,与所配置的hdfs-default.xml中的dfs.replication相关。

      元数据存储

      先明确几个概念:

      fsimage:元数据镜像文件。存储某一时段NameNode内存元数据信息。
      edits:操作日志文件。
      fstime:保存最近一次checkpoint的时间

      checkpoint可在hdfs-default.xml中具体配置,默认为3600秒:

    1 <property>
    2   <name>dfs.namenode.checkpoint.period</name>
    3   <value>3600</value>
    4   <description>The number of seconds between two periodic checkpoints.
    5   </description>
    6 </property>

      fsimage和edits文件在namenode目录可以看到:

    NameNode中的元数据信息:

      

      test.log文件上传后,Namenode始终在内存中保存metedata,用于处理“读请求”。metedata主要存储了文件名称(FileName),副本数量(replicas),分多少block存储(block-ids),分别存储在哪个节点上(id2host)等。

      到有“写请求”到来时,namenode会首先写editlog到磁盘,即向edits文件中写日志,成功返回后,才会修改内存,并且向客户端返回
      hadoop会维护一个fsimage文件,也就是namenode中metedata的镜像,但是fsimage不会随时与namenode内存中的metedata保持一致,而是每隔一段时间通过合并edits文件来更新内容。此时Secondary namenode就派上用场了,合并fsimage和edits文件并更新NameNode的metedata。
      Secondary namenode工作流程:

    1. secondary通知namenode切换edits文件
    2. secondary通过http请求从namenode获得fsimage和edits文件
    3. secondary将fsimage载入内存,然后开始合并edits
    4. secondary将新的fsimage发回给namenode
    5. namenode用新的fsimage替换旧的fsimage

      通过一张图可以表示为:

     文件下载

      文件下载相对来说就简单一些了,如图所示,Client要从DataNode上,读取test.log文件。而test.log由block1和block2组成。

      



      文件下载的主要流程为:

    • client向namenode发送请求。
    • namenode查看Metadata信息,返回test.log的block的位置。     

        Block1: h0,h1,h3
        Block2: h0,h2,h4

    • 开始从h0节点下载block1,block2。

    源码分析

      我们先简单使用hadoop提供的API来实现文件的上传下载(文件删除、改名等操作比较简单,这里不演示):

      

     1 package cn.jon.hadoop.hdfs;
     2 
     3 import java.io.FileInputStream;
     4 import java.io.FileOutputStream;
     5 import java.io.IOException;
     6 import java.io.InputStream;
     7 import java.io.OutputStream;
     8 import java.net.URI;
     9 import java.net.URISyntaxException;
    10 
    11 import org.apache.hadoop.conf.Configuration;
    12 import org.apache.hadoop.fs.FileSystem;
    13 import org.apache.hadoop.fs.Path;
    14 import org.apache.hadoop.io.IOUtils;
    15 import org.junit.Before;
    16 import org.junit.Test;
    17 
    18 public class HDFSDemo {
    19     FileSystem fs = null;    
    20     @Before
    21     public void init(){
    22         try {
    23             //初始化文件系统
    24             fs = FileSystem.get(new URI("hdfs://hadoopmaster:9000"), new Configuration(), "root");
    25         } catch (IOException e) {
    26             e.printStackTrace();
    27         } catch (InterruptedException e) {
    28             e.printStackTrace();
    29         } catch (URISyntaxException e) {
    30             e.printStackTrace();
    31         }
    32     }
    33     public static void main(String[] args) {
    34         
    35     }
    36     @Test
    37     /**
    38      * 文件上传
    39      */
    40     public void testFileUpload(){
    41         try {
    42             OutputStream os = fs.create(new Path("/test.log"));
    43             FileInputStream fis = new FileInputStream("I://test.log");
    44             IOUtils.copyBytes(fis, os, 2048,true);
    45             //可以使用hadoop提供的简单方式
    46             fs.copyFromLocalFile(new Path("I://test.log"), new Path("/test.log"));
    47         } catch (IllegalArgumentException | IOException e) {
    48             e.printStackTrace();
    49         }
    50     }
    51     @Test    
    52     /**
    53      * 文件下载
    54      */
    55     public void testFileDownload(){
    56         try {
    57             InputStream is = fs.open(new Path("/test.log"));
    58             FileOutputStream fos = new FileOutputStream("E://test.log");            
    59             IOUtils.copyBytes(is, fos, 2048);
    60             //可以使用hadoop提供的简单方式
    61             fs.copyToLocalFile(new Path("/test.log"), new Path("E://test.log"));
    62         } catch (IllegalArgumentException | IOException e) {
    63             e.printStackTrace();
    64         }
    65     }
    66 
    67 }

      显而易见,只要是对hdfs上的文件进行操作,必须对FileSystem进行初始化,我们先来分析FileSystem的初始化:

      

    1  public static FileSystem get(URI uri, Configuration conf) throws IOException {
    2     return CACHE.get(uri, conf);//部分方法我只截取了部分代码,这里进入get()方法
    3   }
    1    FileSystem get(URI uri, Configuration conf) throws IOException{
    2       Key key = new Key(uri, conf);
    3       return getInternal(uri, conf, key);//调用getInternal()
    4     }
     1 private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
     2      //使用单例模式创建FileSystem,这是由于FS的初始化需要大量的时间,使用单例保证只是第一次加载慢一些,返回FileSystem的子类实现DistributedFileSystem
     3       FileSystem fs;
     4       synchronized (this) {
     5         fs = map.get(key);
     6       }
     7       if (fs != null) {
     8         return fs;
     9       }
    10 
    11       fs = createFileSystem(uri, conf);
    12       synchronized (this) { // refetch the lock again
    13         FileSystem oldfs = map.get(key);
    14         if (oldfs != null) { // a file system is created while lock is releasing
    15           fs.close(); // close the new file system
    16           return oldfs;  // return the old file system
    17         }
    18         
    19         // now insert the new file system into the map
    20         if (map.isEmpty()
    21                 && !ShutdownHookManager.get().isShutdownInProgress()) {
    22           ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
    23         }
    24         fs.key = key;
    25         map.put(key, fs);
    26         if (conf.getBoolean("fs.automatic.close", true)) {
    27           toAutoClose.add(key);
    28         }
    29         return fs;
    30       }
    31     }
     1 public void initialize(URI uri, Configuration conf) throws IOException {
     2     super.initialize(uri, conf);
     3     setConf(conf);
     4 
     5     String host = uri.getHost();
     6     if (host == null) {
     7       throw new IOException("Incomplete HDFS URI, no host: "+ uri);
     8     }
     9     homeDirPrefix = conf.get(
    10         DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,
    11         DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);
    12     
    13     this.dfs = new DFSClient(uri, conf, statistics);//实例化DFSClient,并将它作为DistributedFileSystem的引用,下面我们跟进去
    14     this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
    15     this.workingDir = getHomeDirectory();
    16   }
     1 public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
     2       Configuration conf, FileSystem.Statistics stats)
     3     throws IOException {
     4     //该构造太长,楼主只截取了重要部分给大家展示,有感兴趣的同学可以亲手进源码瞧瞧     
     5     NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
     6     //这里声明了NameNode的代理对象,跟我们前面讨论的rpc就息息相关了
     7     if (proxyInfo != null) {
     8       this.dtService = proxyInfo.getDelegationTokenService();
     9       this.namenode = proxyInfo.getProxy();
    10     } else if (rpcNamenode != null) {   
    11       Preconditions.checkArgument(nameNodeUri == null);
    12       this.namenode = rpcNamenode;
    13       dtService = null;
    14     } else {
    15       Preconditions.checkArgument(nameNodeUri != null,
    16           "null URI");
    17       proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
    18           ClientProtocol.class, nnFallbackToSimpleAuth);
    19       this.dtService = proxyInfo.getDelegationTokenService();
    20       this.namenode = proxyInfo.getProxy();//获取NameNode代理对象引用并自己持有,this.namenode类型为ClientProtocol,它是一个接口,我们看下这个接口
    21     }
    22   }
    1 public interface ClientProtocol{
    2       public static final long versionID = 69L;
    3       //还有很多对NameNode操作的方法申明,包括对文件上传,下载,删除等
    4       //楼主特意把versionID贴出来了,这就跟我们写的RPCDemo中的MyBizable接口完全类似,所以说Client一旦拿到该接口实现类的代理对象(NameNodeRpcServer),Client就可以实现与NameNode的RPC通信,我们继续跟进
    5 }
     1  public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
     2       URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)
     3       throws IOException {
     4     AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
     5         createFailoverProxyProvider(conf, nameNodeUri, xface, true,
     6           fallbackToSimpleAuth);  
     7     if (failoverProxyProvider == null) {
     8       // 如果不是HA的创建方式,楼主环境是伪分布式,所以走这里,我们跟进去
     9       return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
    10           UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
    11     } else {
    12       // 如果有HA的创建方式
    13       Conf config = new Conf(conf);
    14       T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
    15           RetryPolicies.failoverOnNetworkException(
    16               RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,
    17               config.maxRetryAttempts, config.failoverSleepBaseMillis,
    18               config.failoverSleepMaxMillis));
    19       return new ProxyAndInfo<T>(proxy, dtService,
    20           NameNode.getAddress(nameNodeUri));
    21     }
    22   }

       最终返回的为ClientProtocol接口的子类代理对象,而NameNodeRpcServer类实现了ClientProtocol接口,因此返回的为NameNode的代理对象,当客户端拿到了NameNode的代理对象后,即与NameNode建立了RPC通信:

     1 private static ClientProtocol createNNProxyWithClientProtocol(
     2       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
     3       boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
     4       throws IOException {
     5     RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);//是不是感觉越来越像我们前面说到的RPC
     6 
     7     final RetryPolicy defaultPolicy = 
     8         RetryUtils.getDefaultRetryPolicy(//加载默认策虐
     9             conf, 
    10             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, 
    11             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, 
    12             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
    13             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
    14             SafeModeException.class);
    15     
    16     final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
    17     //看到versionId了吗?这下明白了rpc的使用中目标接口必须要有这个字段了吧
    18     ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
    19         ClientNamenodeProtocolPB.class, version, address, ugi, conf,
    20         NetUtils.getDefaultSocketFactory(conf),
    21         org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
    22         fallbackToSimpleAuth).getProxy();
    23     //看到没?这里使用 RPC.getProtocolProxy()来创建ClientNamenodeProtocolPB对象,调试时可以清楚的看见,该对象引用的是一个代理对象,值为$Proxy12,由JDK的动态代理来实现。
    24     //前面我们写RPCDemo程序时,用的是RPC.getProxy(),但是各位大家可以去看RPC源码,RPC.getProtocolProxy()最终还是调用的getProxy()
    25     if (withRetries) {
    26       Map<String, RetryPolicy> methodNameToPolicyMap 
    27                  = new HashMap<String, RetryPolicy>();    
    28       ClientProtocol translatorProxy =
    29         new ClientNamenodeProtocolTranslatorPB(proxy);
    30       return (ClientProtocol) RetryProxy.create(//这里再次使用代理模式对代理对象进行包装,也可以理解为装饰者模式
    31           ClientProtocol.class,
    32           new DefaultFailoverProxyProvider<ClientProtocol>(
    33               ClientProtocol.class, translatorProxy),
    34           methodNameToPolicyMap,
    35           defaultPolicy);
    36     } else {
    37       return new ClientNamenodeProtocolTranslatorPB(proxy);
    38     }
    39   }

      整个FileSystem的初始化用时序图表示为:

      到此,FileSystem的初始化就基本完成。由于文章篇幅过大的问题,所以楼主把HDFS原理及源码分析拆分成了两部分,上半部分主要是HDFS原理与FileSystem的初始化介绍,那在下半部分将会具体介绍HDFS文件上传、下载的源码解析。

      另外,文章用到的一些示例代码,将会在下半部分发布后,楼主一起上传到GitHub。

  • 相关阅读:
    13.App爬取相关库的安装(Charles,Mitmproxy,Appium)
    26.pymysql、pymongo、redis-py安装
    25.安装配置phantomjs
    2.博客随笔加密!!!
    17.scrapy-splash安装-2
    17.docker及scrapy-splash安装-1
    16.Mongodb安装
    scrapy--BeautifulSoup
    scrapy--selenium
    python--随笔一
  • 原文地址:https://www.cnblogs.com/qq503665965/p/6696675.html
Copyright © 2011-2022 走看看