zoukankan      html  css  js  c++  java
  • hive多分隔符的解决方案

    题记:

      近期在做某个大型银行的大数据项目,当在处理非结构化数据时,却发现他们给的数据并不符合hive和pig的处理要求,数据每行必须需要多个分割符才能完美处理,一下午也没有想到完美的办法解决,今天重新审视了一下整个过程。看来hive的命令行没法搞定了。于是乎,只能通过代码来搞定。

    1、重新实现hive的InputFormat了,别急放码过来

     1 package hiveStream;
     2 
     3 import java.io.IOException;
     4 
     5 import org.apache.hadoop.io.LongWritable;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.mapred.FileSplit;
     8 import org.apache.hadoop.mapred.InputSplit;
     9 import org.apache.hadoop.mapred.JobConf;
    10 import org.apache.hadoop.mapred.JobConfigurable;
    11 import org.apache.hadoop.mapred.RecordReader;
    12 import org.apache.hadoop.mapred.Reporter;
    13 import org.apache.hadoop.mapred.TextInputFormat;
    14 
    15 public class MyHiveInputFormat  extends TextInputFormat implements JobConfigurable{
    16 
    17     @Override
    18     public RecordReader<LongWritable, Text> getRecordReader(
    19             InputSplit genericSplit, JobConf job, Reporter reporter)
    20             throws IOException {
    21          reporter.setStatus(genericSplit.toString());
    22            return new MyRecordReader((FileSplit) genericSplit, job);
    23     }
    24     
    25 }
    View Code

    2、仔细看看下面的方法,不解释,自己领悟。

      1 package hiveStream;
      2 
      3 import java.io.IOException;
      4 import java.io.InputStream;
      5 
      6 import org.apache.hadoop.conf.Configuration;
      7 import org.apache.hadoop.fs.FSDataInputStream;
      8 import org.apache.hadoop.fs.FileSystem;
      9 import org.apache.hadoop.fs.Path;
     10 import org.apache.hadoop.io.LongWritable;
     11 import org.apache.hadoop.io.Text;
     12 import org.apache.hadoop.io.compress.CompressionCodec;
     13 import org.apache.hadoop.io.compress.CompressionCodecFactory;
     14 import org.apache.hadoop.mapred.FileSplit;
     15 import org.apache.hadoop.mapred.RecordReader;
     16 import org.apache.hadoop.util.LineReader;
     17 
     18 
     19 public class MyRecordReader implements RecordReader<LongWritable, Text>{
     20     
     21     private CompressionCodecFactory compressionCodecs = null;
     22     private long start;
     23     private long pos;
     24     private long end;
     25     private LineReader lineReader;
     26     int maxLineLength;
     27     
     28     public MyRecordReader(InputStream in, long offset, long endOffset,
     29             int maxLineLength) {
     30         this.maxLineLength = maxLineLength;
     31         this.start = offset;
     32         this.lineReader = new LineReader(in);
     33         this.pos = offset;
     34         this.end = endOffset;
     35     }
     36     
     37     public MyRecordReader(InputStream in, long offset, long endOffset,
     38             Configuration job) throws IOException {
     39         this.maxLineLength = job.getInt(
     40                 "mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE);
     41         this.lineReader = new LineReader(in, job);
     42         this.start = offset;
     43         this.end = endOffset;
     44     }
     45     
     46     // 构造方法
     47     public MyRecordReader(FileSplit inputSplit, Configuration job)
     48             throws IOException {
     49         maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength",
     50                 Integer.MAX_VALUE);
     51         start = inputSplit.getStart();
     52         end = start + inputSplit.getLength();
     53         final Path file = inputSplit.getPath();
     54         // 创建压缩器
     55         compressionCodecs = new CompressionCodecFactory(job);
     56         final CompressionCodec codec = compressionCodecs.getCodec(file);
     57         // 打开文件系统
     58         FileSystem fs = file.getFileSystem(job);
     59         FSDataInputStream fileIn = fs.open(file);
     60         boolean skipFirstLine = false;
     61  
     62         if (codec != null) {
     63             lineReader = new LineReader(codec.createInputStream(fileIn), job);
     64             end = Long.MAX_VALUE;
     65         } else {
     66             if (start != 0) {
     67                 skipFirstLine = true;
     68                 --start;
     69                 fileIn.seek(start);
     70             }
     71             lineReader = new LineReader(fileIn, job);
     72         }
     73  
     74         if (skipFirstLine) {
     75             start += lineReader.readLine(new Text(), 0,
     76                     (int) Math.min((long) Integer.MAX_VALUE, end - start));
     77         }
     78         this.pos = start;
     79     }
     80     
     81     @Override
     82     public void close() throws IOException {
     83         if (lineReader != null)
     84             lineReader.close();
     85     }
     86 
     87     @Override
     88     public LongWritable createKey() {
     89         return new LongWritable();
     90     }
     91 
     92     @Override
     93     public Text createValue() {
     94          return new Text();
     95     }
     96 
     97     @Override
     98     public long getPos() throws IOException {
     99           return pos;
    100     }
    101 
    102     @Override
    103     public float getProgress() throws IOException {
    104        if (start == end) {
    105             return 0.0f;
    106         } else {
    107             return Math.min(1.0f, (pos - start) / (float) (end - start));
    108         }
    109     }
    110 
    111     @Override
    112     public boolean next(LongWritable key, Text value) throws IOException {
    113          while (pos < end) {
    114                 key.set(pos);
    115                 int newSize = lineReader.readLine(value, maxLineLength,
    116                         Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
    117                                 maxLineLength));
    118                 // 把字符串中的"##"转变为"#"
    119                 String strReplace = value.toString().replaceAll("\s+", "01");
    120                 Text txtReplace = new Text();
    121                 txtReplace.set(strReplace);
    122                 value.set(txtReplace.getBytes(), 0, txtReplace.getLength());
    123                 if (newSize == 0)
    124                     return false;
    125                 pos += newSize;
    126                 if (newSize < maxLineLength)
    127                     return true;
    128             }
    129             return false;
    130         }
    131     }
    View Code

    3、处理实例:如下

     1 数据处理要求:
     2      
     3     12 afd   fewf   fewfe  we
     4     76 vee   ppt    wfew  wefw
     5     83 tyutr   ppt  wfew  wefw
     6     45 vbe   ppt    wfew  wefw
     7     565 wee   ppt   wfew  wefw
     8     12 sde   ppt    wfew  wefw
     9 注意:字段之间的空格不一致
    10  
    11 1、建表:
    12     create table micmiu_blog(author int, category string, url string,town string,oop string) stored as inputformat 'hiveStream.MyHiveInputFormat' outputformat  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
    13 注意:输出咱可没有重写哦
    14  
    15 2、加载数据:
    16     LOAD DATA LOCAL INPATH'/mnt/test' OVERWRITE INTO TABLE micmiu_blog;
    17  
    18 3、看看的成果:
    19     select * from micmiu_blog;
    20  
    21 自己去试试,不解释
    View Code
  • 相关阅读:
    数据库常用操作命令以及explain执行计划
    spring中父子容器
    为什么SpringCloud引导类不加@EnableDiscoveryClient也可以注册到eureka中
    使用dubbo的注解,AOP配置xml的方式无法开启事务
    Excel导出打印失败报错 (eg HSSF instead of XSSF)
    0317 ajax
    0316 事务
    0316 DBUtils
    0315 el技术和jstl技术 javaEE开发模式
    0313 jsp
  • 原文地址:https://www.cnblogs.com/hiter-java/p/4820776.html
Copyright © 2011-2022 走看看