zoukankan      html  css  js  c++  java
  • Hive中自定义序列化器(带编码)

    hive SerDe的简介

    https://www.jianshu.com/p/afee9acba686

    问题

    数据文件为文本文件,每一行为固定格式,每一列的长度都是定长或是有限制范围,考虑采用hive提供的RegexSerDe来实现记录解析,使用后发现hive查询出的数据中文字段乱码

    解决过程

    serialization.encoding=GBK

    Hadoop中文件默认utf8编码,hive序列化操作时,默认按照utf8来解析,所以肯定会乱码,从网上查了下,解决方案是建表是指定serde的"serialization.encoding"="GBK",然而并没有解决我的问题

    源码

    Hive建表格式为ROW FORMAT,不指定SerDe时,默认用的是org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,继承了org.apache.hadoop.hive.serde2.AbstractEncodingAwareSerDe,而该类确实可以通过设置"serialization.encoding"="GBK"来解决hive读取gbk文件乱码的问题,代码如下:

    //
    // Source code recreated from a .class file by IntelliJ IDEA
    // (powered by Fernflower decompiler)
    //
    
    package org.apache.hadoop.hive.serde2;
    
    import com.google.common.base.Charsets;
    import java.nio.charset.Charset;
    import java.util.Properties;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.io.Writable;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public abstract class AbstractEncodingAwareSerDe extends AbstractSerDe {
        private static final Logger LOG = LoggerFactory.getLogger(AbstractEncodingAwareSerDe.class);
        protected Charset charset;
    
        public AbstractEncodingAwareSerDe() {
        }
    
        /** @deprecated */
        @Deprecated
        public void initialize(Configuration conf, Properties tbl) throws SerDeException {
            this.charset = Charset.forName(tbl.getProperty("serialization.encoding", "UTF-8"));
            if (this.charset.equals(Charsets.ISO_8859_1) || this.charset.equals(Charsets.US_ASCII)) {
                LOG.warn("The data may not be properly converted to target charset " + this.charset);
            }
    
        }
    
        public final Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
            Writable result = this.doSerialize(obj, objInspector);
            if (!this.charset.equals(Charsets.UTF_8)) {
                result = this.transformFromUTF8(result);
            }
    
            return result;
        }
    
        protected abstract Writable transformFromUTF8(Writable var1);
    
        protected abstract Writable doSerialize(Object var1, ObjectInspector var2) throws SerDeException;
    
        public final Object deserialize(Writable blob) throws SerDeException {
            if (!this.charset.equals(Charsets.UTF_8)) {
                blob = this.transformToUTF8(blob);
            }
    
            return this.doDeserialize(blob);
        }
    
        protected abstract Writable transformToUTF8(Writable var1);
    
        protected abstract Object doDeserialize(Writable var1) throws SerDeException;
    }
    
    

    继续查看org.apache.hadoop.hive.serde2.RegexSerDe,发现并没有用到serialization.encoding,难怪设置了也没有用,源码就不贴了

    解决

    解决方法也很简单,自定义类EncodingAwareRegexSerDe继承RegexSerDe,实现转UTF8的功能,代码如下:

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hive.serde2.RegexSerDe;
    import org.apache.hadoop.hive.serde2.SerDeException;
    import org.apache.hadoop.hive.serde2.SerDeSpec;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.UnsupportedEncodingException;
    import java.util.Properties;
    
    
    @SerDeSpec(
            schemaProps = {"columns", "columns.types", "input.regex", "input.regex.case.insensitive","serialization.encoding"}
    )
    public class EncodingAwareRegexSerDe extends RegexSerDe {
        public static final Logger LOG = LoggerFactory.getLogger(EncodingAwareRegexSerDe.class.getName());
        protected String charsetName;
        public EncodingAwareRegexSerDe(){
            super();
        }
    
        @Override
        public void initialize(Configuration conf, Properties tbl) throws SerDeException {
            super.initialize(conf, tbl);
            this.charsetName = tbl.getProperty("serialization.encoding", "UTF-8").trim();
        }
    
        @Override
        public Object deserialize(Writable blob) throws SerDeException {
            Text rowText = (Text) blob;
            Text utf8Text = transformTextToYTF8(rowText,this.charsetName);
            return super.deserialize(utf8Text);
        }
    
        private Text transformTextToYTF8(Text text,String encoding){
            String value = "";
            try{
                value = new String(text.getBytes(),0,text.getLength(),encoding);
            }catch (UnsupportedEncodingException e){
                e.printStackTrace();
            }
            return new Text(value);
        }
    }
    

    使用自定义序列化器

    将上述自定义的类打成jar包后,即可使用

    操作hive shell
    hive> add jar /home/dw_hbkal/przhang/hive-custom-serdes-1.0-SNAPSHOT.jar;
    CREATE EXTERNAL TABLE IF NOT EXISTS test_tooldb.ind01acoma_tmp(
    acq_ins_id_cd          STRING,
    fwd_settle_at          DECIMAL(12, 0),
    repl_at                DECIMAL(12, 0),
    ......
    card_accptr_nm_addr    STRING,
    resv5                  STRING
    )PARTITIONED BY(ins_id_cd STRING, hp_settle_dt STRING)
     ROW FORMAT SERDE 'com.unionpay.bigdataTest.hive.serdes.EncodingAwareRegexSerDe'
          with serdeproperties (
          "input.regex"="(.{11}) (.{11}) (.{6}) (.{10}) (.{19}) (.{12}) (.{12}) (.{12}) (.{4}) (.{6}) (.{4}) (.{8}) (.{15}) (.{12}) (.{2}) (.{6}) (.{11}) (.{6}) (.{2}) (.{3}) (.{12}) (.{12}) (.{12}) (.{1}) (.{3}) (.{1}) (.{1}) (.{10}) (.{11}) (.{1}) (.{2}) (.{2}) (.{12}) (.{1})(.{2})(.{1})(.{1})(.{2})(.{1})(.{1})(.{2})(.{1})(.{2}) (.{11}) (.{11}) (.{1}) (.{1}) (.{4}) (.{2}) (.{1,40}) (.{3}) (.{9}) (.{9}) (.{11}) (.{9}) (.{11}) (.{9}) (.{11}) (.{9}) (.{11}) (.{9}) (.{11}) (.{9}) (.{11}) (.{9}) (.{11}) (.{9}) (.{11}) (.{9}) (.{11}) (.{9}) (.{11}) (.{9}) (.{9}) (.{9}) (.{9}) (.{19}) (.{2}) (.{40}) (.{4}) (.{1}) (.{2}) (.{10}) (.{6}) (.{1}) (.{12}) (.{193})",
          "serialization.encoding"="GBK" 
    )
          STORED AS TEXTFILE
          LOCATION '/user/dw_hbkal/db/test_tooldb/ind01acoma_tmp';
    load data local inpath '/home/dw_hbkal/przhang/IND18071032ACOMA' overwrite into table test_tooldb.ind01acoma_tmp partition(ins_id_cd='01055800',hp_settle_dt='20180710');
    
    
  • 相关阅读:
    一周总结
    [z]OpenGL Wiki
    [Z]OpenCL Data Parallel Primitives Library
    [z]苹果用OpenCL实现的Parallel Prefix Sum
    指定VC中std::sort的比较函数时发生"invalid operator<"错误原因
    [z]FNV哈希算法
    [z]NViDIA用OpenCL实现的很多基础并行算法
    [z]一个基于CUDA的基础并行算法库
    [z]一个讲解很多OpenGL中基本概念的网站
    [Z]Marching Cubes的实现
  • 原文地址:https://www.cnblogs.com/darange/p/13690919.html
Copyright © 2011-2022 走看看