zoukankan      html  css  js  c++  java
  • Datax写入parquet类型的hive表时处理timestamp类型字段的方法

    一、概述

      1、  hive中的Timestamp

        Hive在0.8的版本后开始支持Timestamp的格式。Hive在储存时间戳的时候会先把时间转成UTC的时间,然后再把转换后的时间存储到Parquet文件中。在读取Parquet文件的时候Hive会把时间从UTC时间再转化回成本地的时间。这样的话,如果存和读取都是用Hive的话,时间不会有任何的问题。上述说的是用Parquet文件来存取时间格式流程,如果是存成普通的文本文件的话,存取都不会进行任何时间的转换。

      2、Parquet中的Timestamp

        Parquet文件格式是当前Hadoop生态中最流行的列式存储格式。Parquet支持的类型有BOOLEAN、INT32、INT64、INT96、FLOAT、DOUBLE、BYTE_ARRAY,所以Timestamp其实是一种逻辑类型。由于Impala存储的时间精度达到纳秒的级别,所以在Parquet文件中用INT96来存储时间。其他的数据处理引擎也跟进该精度,所以也用INT96来存储,但是在时区兼容性方面做得并不好。

    二、parquet格式的hive中timestamp字段问题

       在datax中如果我们直接用group.append(columns.get(i).getString(Key.NAME),dataFormat.format(column.asDate()),在使用xshell读取时会报如下错误:

       对于parquet类型文件的时间戳逻辑类型(注释为int96),这种时间戳编码(int96)似乎很少见,而且不受支持。再了解Parquet的timestamp存储原理后,这个问题就好解决了,保存为Int96的时间戳由一天中的纳秒组成。

      明确地:messagetype模式中的列使用哪种 Parquet类型?我们应该使用基元类型primitivetypename.int96。

    Types.MessageTypeBuilder mtb = Types.buildMessage();
    for(Configuration eachColumnConf : columns) {
      SupportHiveDataType columnType = SupportHiveDataType.valueOf(eachColumnConf.getString(Key.Type).toUpperCase());
      switch(columnType) {
    
       ...
       case TIMESTAMP:
       case DATE:
          mtb.optional(PrimitiveType.PrimitiveTypeName.INT96).named(eachColumnConf.getString(Key.NAME))
       break;
    ...}

     在存入时,需要将值进行转换

    group.append(columns.get(i).getString(Key.NAME),dataFormat.format(column.asDate())

     用spark sql中的这段代码作为参考,终于找到了答案

    String value = "2019-02-13 13:35:05";
     
    final long NANOS_PER_HOUR = TimeUnit.HOURS.toNanos(1);
    final long NANOS_PER_MINUTE = TimeUnit.MINUTES.toNanos(1);
    final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);
    // Parse date
    SimpleDateFormat parser = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
    cal.setTime(parser.parse(value));
    // Calculate Julian days and nanoseconds in the day
    LocalDate dt = LocalDate.of(cal.get(Calendar.YEAR), cal.get(Calendar.MONTH)+1, cal.get(Calendar.DAY_OF_MONTH));
    int julianDays = (int) JulianFields.JULIAN_DAY.getFrom(dt);
    long nanos = (cal.get(Calendar.HOUR_OF_DAY) * NANOS_PER_HOUR)
            + (cal.get(Calendar.MINUTE) * NANOS_PER_MINUTE)
            + (cal.get(Calendar.SECOND) * NANOS_PER_SECOND);
    // Write INT96 timestamp
    byte[] timestampBuffer = new byte[12];
    ByteBuffer buf = ByteBuffer.wrap(timestampBuffer);
    buf.order(ByteOrder.LITTLE_ENDIAN).putLong(nanos).putInt(julianDays);
    // This is the properly encoded INT96 timestamp
    Binary tsValue = Binary.fromReusedByteArray(timestampBuffer);

       即将需要存储的时间先转换为Binary类型再存储到parquet,hive读取时再自动转换为Timestamp,问题解决。

  • 相关阅读:
    mysql表的查询(连接查询)练习
    mysql基础语法
    Linux 常用命令整理
    1.django 环境搭建
    2.django 操作笔记
    mysql基础笔记(1)
    VMware复制Linux虚拟机后网络配置
    uC/OS-III 软件定时器(三)
    uC/OS-III 时间管理(二)
    uC/OS-III 时钟节拍(一)
  • 原文地址:https://www.cnblogs.com/chhyan-dream/p/13268994.html
Copyright © 2011-2022 走看看