zoukankan      html  css  js  c++  java
  • 【大数据】hive 删除临时文件 .hive-staging_hive

    一、.hive-staging_hive 产生的原因:

    通过spark-sql、hive-sql、hue等提交select或者insert overwrite等sql到hive时,会产生该目录,用于临时存放执行结果,比如insert overwrite会将结果暂存到该目录下,待任务结束,将结果复制到hive表中。hql任务执行失败时,这些临时文件和目录不会被自动删除掉,直到有相关的hql执行成功时,才会自动删掉。

    二、.hive-staging_hive 会导致的一些问题:

    1、如果目标路径存在临时文件的话,datax 读取hdfs数据的时候会报错,因为 .hive-staging_hive 临时文件的数据也会被 datax 读到;

    2、有的 .hive-staging_hive 可能会包含很多的临时文件,占用空间,同时也会造成 namenode 的压力;

    三、解决办法(CDH):

    进入hive配置页面,搜索 hive-site.xml ,找到服务高级配置代码段(安全阀)和 客户端高级配置代码段(安全阀),增加配置:

    <property>
        <name>hive.insert.into.multilevel.dirs</name>
        <value>true</value>
        <description>允许生成多级目录</description>
    </property>
    <property>
        <name>hive.exec.stagingdir</name>
        <value>/tmp/hive/staging/.hive-staging</value>
        <description>临时文件暂放目录</description>
    </property>

    1、hive-site.xml 的 Hive 服务高级配置代码段(安全阀)

    2、hive-site.xml 的 Hive 客户端高级配置代码段(安全阀)

     

    四、历史遗留的 hive-staging 文件处理和定期维护

    这边提供一个 python3.6 脚本,需要注意的是删除的时候,一般不要删除今天的临时文件,因为可能有作业正在使用。

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    # @Time : 2020/5/7 09:59
    # @Author : way
    # @Site : 
    # @Describe: 清除hive昨日残留的staging垃圾文件
    
    import time
    import datetime
    from hdfs import Client
    
    class staging:
    
        def __init__(self, HDFS_ClIENT):
            """
            :param HDFS_ClIENT: hdfs客户端连接
            """
            self.client = Client(HDFS_ClIENT)
            self.today_timestamp = int(time.mktime(time.strptime(str(datetime.date.today()), '%Y-%m-%d')))*1000
    
    
        # 移动 hive 垃圾文件 foo >> staging_tmp_path
        def move(self, foo, staging_tmp_path):
            filemsg = self.client.status(foo, strict=False)
            if filemsg.get('type') == 'DIRECTORY':
                dirs = self.client.list(foo, status=False)
                for son in dirs:
                    if '.hive-staging_hive' in son:
                        src = f'{foo}/{son}'
                        dst = f'{staging_tmp_path}/{son}'
                        # print(src, dst)
                        self.client.rename(src, dst)
                        print(f"{son} has moved to {dst} success")
                    else:
                        self.move(foo + '/' + son, staging_tmp_path)
    
    
        # 移除 昨天的垃圾文件
        def delete(self, staging_tmp_path):
            dirs = self.client.list(staging_tmp_path, status=False)
            for dir in dirs:
                target = staging_tmp_path + '/' + dir
                modificationTime = self.client.status(target, strict=False).get('modificationTime')
                if '.hive-staging_hive' in dir and modificationTime < self.today_timestamp:
                    # print(target)
                    self.client.delete(target, recursive=True)
                    print(f"{target} has been deleted success")
    
    
    if __name__ == "__main__":
        HDFS_ClIENT = "http://172.16.122.21:50070;http://172.16.122.24:50070"
        staging_tmp_path = '/tmp/hive/staging'
        stag = staging(HDFS_ClIENT)
        # 移动 hive 垃圾文件 foo >> staging_tmp_path
        # foo = '/user/hive/warehouse'
        # stag.move(foo, staging_tmp_path)
        # 移除 昨天的垃圾文件
        stag.delete(staging_tmp_path)
  • 相关阅读:
    OSPF综合实验一
    OSPF—开放最短路径优先协议详解---附:OSPF LSA 详解
    OSPF--LSA详解
    smfony设置量表之间的关系
    smyfony2 增删改查
    js中substring和substr的用法
    jQuery id模糊 选择器 批量处理
    《内存数据库和mysql的同步机制》
    linux 基本。。
    Django之模板引擎(母版)
  • 原文地址:https://www.cnblogs.com/TurboWay/p/12838557.html
Copyright © 2011-2022 走看看