zoukankan      html  css  js  c++  java
  • hadoop中hdfs写入流程

    hdfs写入数据流程总结:
    ================================
    1、通过配置文件获取DistributedFileSystem实例
    2、初始化校验和类型和大小 ===> 类型CRC32C,大小4byte //对每个chunk进行校验,chunk大小512字节
    3、创建namenode元数据:
    在DFSOutputStream中dfsClient.namenode.create
    4、使用computePacketChunkSize方法对packet和chunk进行计算 //计算每个packet中的chunk数量(126)
    5、使用DFSPacket初始化包对象
    6、writeChecksumChunks写入数据:方法,最终使用System.arrayCopy方法:
    先写入4 x 9字节的checksum
    再写入512 x 9字节的chunk

    7、waitAndQueueCurrentPacket:将数据放入dataQueue中。接着notifyAll,唤醒DataStreamer线程


    8、DataStreamer:设置管线,然后打开datanode的传输流,
    底层传输使用的是nio的非阻塞技术
    protobuf串行化技术

    9、数据写入成功的时候:
    dataQueue.removeFirst(); //将数据队列中的第一个数据删除
    ackQueue.addLast(one); //将此数据移动到确认队列的末尾
    dataQueue.notifyAll(); //通知DataStreamer继续传输包

    10、将数据实例化到磁盘的过程:
    先把checksum和data之间的鸿沟去掉:
    移动checksum数据到data数据之前
    移动header数据到checksum之前


    HDFS 上传流程
    过程解析:详解
    这里描述的 是一个256M的文件上传过程
    ① 由客户端 向 NameNode节点节点 发出请求
    ②NameNode 向Client返回可以可以存数据的 DataNode 这里遵循 机架感应 原则

    ③客户端 首先 根据返回的信息 先将 文件分块(Hadoop2.X版本 每一个block为 128M 而之前的版本为 64M)
    ④然后通过那么Node返回的DataNode信息 直接发送给DataNode 并且是 流式写入 同时 会复制到其他两台机器
    ⑤dataNode 向 Client通信 表示已经传完 数据块 同时向NameNode报告
    ⑥依照上面(④到⑤)的原理将 所有的数据块都上传结束 向 NameNode 报告 表明 已经传完所有的数据块

    这样 整个HDFS上传流程就 走完了



    DFSPacket:
    ========================================
    /**
    * buf is pointed into like follows:
    * (C is checksum data, D is payload data)
    *
    * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
    * ^ ^ ^ ^
    * | checksumPos dataStart dataPos
    * checksumStart

    *
    * Right before sending, we move the checksum data to immediately precede
    * the actual data, and then insert the header into the buffer immediately
    * preceding the checksum data, so we make sure to keep enough space in
    * front of the checksum data to support the largest conceivable header.
    */

    DataStreamer:
    =========================================
    DataStreamer类是对从管线发送数据包到datanode是可响应的,
    从namenode获取新的块id和块位置,然后开始流式传输包到datanodes
    每个包都有一个序列号相关联,当block中所有的packet被发出且收到所有的包回执
    DataStreamer就关闭当前的块

    DFSOutputStream:
    =======================================================
    客户端程序将数据通过这个流写入到内部缓存。
    数据被分割成packet,每个包64k

    一个packet由chunk组成。每个chunk是512字节,相应的关联一个校验和

    当客户端程序填满当前的packet,会填充到dataQueue(数据队列)。
    DataStreamer 线程从dataQueue(数据队列)抓取数据,并将其通过管线发送到第一个datanode
    接着将数据从dataQueue(数据队列)移动到ackQueue(确认队列)。当收到所有数据节点的确认回执
    ResponseProcessor(响应处理器)会将数据从ackQueue(确认队列)中移除

    如果出现错误,所有未完成的包将从ackQueue(确认队列)移出
    通过清除错误数据节点的管线,生成一个新的管线
    DataStreamer开始重新传输数据

    Datanode和Namenode的VERSION文件:
    namenode:
    =================================
    #Sun Mar 18 09:36:21 CST 2018
    namespaceID=133742883
    clusterID=CID-126a68dc-a8c1-4517-8f28-60fb6af6c269
    cTime=0
    storageType=NAME_NODE
    blockpoolID=BP-1464761855-192.168.23.101-1520907981134
    layoutVersion=-63
    
    datanode:
    ==================================
    #Sun Mar 18 09:36:47 CST 2018
    storageID=DS-6068e606-1d2d-4865-aa62-1cd326ee3e64
    clusterID=CID-126a68dc-a8c1-4517-8f28-60fb6af6c269
    cTime=0
    datanodeUuid=705f0e4e-a50b-4448-84ed-fc6e2f8d2923
    storageType=DATA_NODE
    layoutVersion=-56


    HDFS特性:
    适用于存储超大文件
    适用于流式数据访问,不具有随机定位文件的功能
    支持构建于商业硬件
    不适用于低时间延迟的数据访问
    不适用于存储海量小文件 //har、压缩、sequenceFile
    不适用于多用户写入和任意位置修改文件


    MapReduce:
    编程模型,适用于分布式处理海量文件

    App //入口函数(main)
    Mapper //map
    Reducer //reduce


    在集群运行MR程序:
    1、修改代码
    2、将代码打包成jar 并发送到Linux操作系统
    3、创建源文件(1.txt)
    4、使用命令hadoop jar myhadoop.jar com.oldboy.mr.App /1.txt /out

    1、输入路径可以写文件名和文件夹名:
    文件夹名称会将文件夹下所有文件读取,且忽略子目录文件

    2、Map数和文件数量有关:
    需要将小文件进行归档或压缩
    hadoop archive -archiveName temp.har -p / Temp /


    partition:分区,指派某种类型的key发送到某个reduce进行计算
    hash分区: (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

    key.hashCode() & Integer.MAX_VALUE //保证数字为正值
    % numReduceTasks //取余数,保证范围在0~n-1之间

    自定义分区:

    public class WCPartitioner extends Partitioner<Text,IntWritable> {
        /**
         * 数字到分区0,字符到分区1
         */
        @Override
        public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
        String key = text.toString();
        try {
            Integer.parseInt(key);
            return 0;
        } catch (Exception e) {
            return 1;
        }
        }
    }

    combiner:
    map端的reduce:
    在map端预处理时候的聚合(预聚合)

  • 相关阅读:
    CF763C Timofey and Remoduling
    CF762E Radio Stations
    CF762D Maximum Path
    CF763B Timofey and Rectangles
    URAL1696 Salary for Robots
    uva10884 Persephone
    LA4273 Post Offices
    SCU3037 Painting the Balls
    poj3375 Network Connection
    Golang zip压缩文件读写操作
  • 原文地址:https://www.cnblogs.com/zyde/p/8900234.html
Copyright © 2011-2022 走看看