zoukankan      html  css  js  c++  java
  • hive不支持多个字符作为分隔符的解决方案

    题记:

      近期在做某个大型银行的大数据项目,当在处理非结构化数据时,却发现他们给的数据并不符合hive和pig的处理要求,数据每行必须需要多个分割符才能完美处理,一下午也没有想到完美的办法解决,今天重新审视了一下整个过程。看来hive的命令行没法搞定了。于是乎,只能通过代码来搞定。

    1、重新实现hive的InputFormat了,别急放码过来

    package hiveStream;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileSplit;
    import org.apache.hadoop.mapred.InputSplit;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.JobConfigurable;
    import org.apache.hadoop.mapred.RecordReader;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.TextInputFormat;
    
    public class MyHiveInputFormat  extends TextInputFormat implements JobConfigurable{
    
    	@Override
    	public RecordReader<LongWritable, Text> getRecordReader(
    			InputSplit genericSplit, JobConf job, Reporter reporter)
    			throws IOException {
    		 reporter.setStatus(genericSplit.toString());
    	       return new MyRecordReader((FileSplit) genericSplit, job);
    	}
    	
    }
    

    2、仔细看看下面的方法,不解释,自己领悟。

    package hiveStream;
    
    import java.io.IOException;
    import java.io.InputStream;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    import org.apache.hadoop.mapred.FileSplit;
    import org.apache.hadoop.mapred.RecordReader;
    import org.apache.hadoop.util.LineReader;
    
    
    public class MyRecordReader implements RecordReader<LongWritable, Text>{
    	
    	private CompressionCodecFactory compressionCodecs = null;
        private long start;
        private long pos;
        private long end;
        private LineReader lineReader;
        int maxLineLength;
    	
        public MyRecordReader(InputStream in, long offset, long endOffset,
                int maxLineLength) {
            this.maxLineLength = maxLineLength;
            this.start = offset;
            this.lineReader = new LineReader(in);
            this.pos = offset;
            this.end = endOffset;
        }
        
        public MyRecordReader(InputStream in, long offset, long endOffset,
                Configuration job) throws IOException {
            this.maxLineLength = job.getInt(
                    "mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE);
            this.lineReader = new LineReader(in, job);
            this.start = offset;
            this.end = endOffset;
        }
        
        // 构造方法
        public MyRecordReader(FileSplit inputSplit, Configuration job)
                throws IOException {
            maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength",
                    Integer.MAX_VALUE);
            start = inputSplit.getStart();
            end = start + inputSplit.getLength();
            final Path file = inputSplit.getPath();
            // 创建压缩器
            compressionCodecs = new CompressionCodecFactory(job);
            final CompressionCodec codec = compressionCodecs.getCodec(file);
            // 打开文件系统
            FileSystem fs = file.getFileSystem(job);
            FSDataInputStream fileIn = fs.open(file);
            boolean skipFirstLine = false;
     
            if (codec != null) {
                lineReader = new LineReader(codec.createInputStream(fileIn), job);
                end = Long.MAX_VALUE;
            } else {
                if (start != 0) {
                    skipFirstLine = true;
                    --start;
                    fileIn.seek(start);
                }
                lineReader = new LineReader(fileIn, job);
            }
     
            if (skipFirstLine) {
                start += lineReader.readLine(new Text(), 0,
                        (int) Math.min((long) Integer.MAX_VALUE, end - start));
            }
            this.pos = start;
        }
    	
        @Override
    	public void close() throws IOException {
        	if (lineReader != null)
                lineReader.close();
    	}
    
    	@Override
    	public LongWritable createKey() {
    		return new LongWritable();
    	}
    
    	@Override
    	public Text createValue() {
    		 return new Text();
    	}
    
    	@Override
    	public long getPos() throws IOException {
    		  return pos;
    	}
    
    	@Override
    	public float getProgress() throws IOException {
    	   if (start == end) {
                return 0.0f;
            } else {
                return Math.min(1.0f, (pos - start) / (float) (end - start));
            }
    	}
    
    	@Override
    	public boolean next(LongWritable key, Text value) throws IOException {
    		 while (pos < end) {
    	            key.set(pos);
    	            int newSize = lineReader.readLine(value, maxLineLength,
    	                    Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
    	                            maxLineLength));
    	            // 把字符串中的"##"转变为"#"
    	            String strReplace = value.toString().replaceAll("\s+", "01");
    	            Text txtReplace = new Text();
    	            txtReplace.set(strReplace);
    	            value.set(txtReplace.getBytes(), 0, txtReplace.getLength());
    	            if (newSize == 0)
    	                return false;
    	            pos += newSize;
    	            if (newSize < maxLineLength)
    	                return true;
    	        }
    	        return false;
    	    }
    	}
    

    3、处理实例:如下

         

    数据处理要求:
        
        12 afd   fewf	fewfe  we
        76 vee   ppt	wfew  wefw
        83 tyutr   ppt	wfew  wefw
        45 vbe   ppt	wfew  wefw
        565 wee   ppt	wfew  wefw
        12 sde   ppt	wfew  wefw
    注意:字段之间的空格不一致
    
    1、建表:
        create table micmiu_blog(author int, category string, url string,town string,oop string) stored as inputformat 'hiveStream.MyHiveInputFormat' outputformat  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
    注意:输出咱可没有重写哦
    
    2、加载数据:
        LOAD DATA LOCAL INPATH'/mnt/test' OVERWRITE INTO TABLE micmiu_blog;
    
    3、看看的成果:
        select * from micmiu_blog;
    
    自己去试试,不解释
    

      

  • 相关阅读:
    分布式文件系统 ~MogileFS~
    使用HAproxy如何实现web站点的动静分离
    MySQL 服务器变量 数据操作DML-视图
    MySQL 查询缓存
    NGINX 如何防盗链
    Apache 如何反向代理tomcat并且实现Session保持
    Linux 内核编译步骤及配置详解
    NGINX如何反向代理Tomcat并且实现Session保持
    LogStash日志分析系统
    bash编程之 ~制作Mini Linux系统~
  • 原文地址:https://www.cnblogs.com/hiter-java/p/4820773.html
Copyright © 2011-2022 走看看