题记:
近期在做某个大型银行的大数据项目,当在处理非结构化数据时,却发现他们给的数据并不符合hive和pig的处理要求,数据每行必须需要多个分割符才能完美处理,一下午也没有想到完美的办法解决,今天重新审视了一下整个过程。看来hive的命令行没法搞定了。于是乎,只能通过代码来搞定。
1、重新实现hive的InputFormat了,别急放码过来
1 package hiveStream; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.LongWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapred.FileSplit; 8 import org.apache.hadoop.mapred.InputSplit; 9 import org.apache.hadoop.mapred.JobConf; 10 import org.apache.hadoop.mapred.JobConfigurable; 11 import org.apache.hadoop.mapred.RecordReader; 12 import org.apache.hadoop.mapred.Reporter; 13 import org.apache.hadoop.mapred.TextInputFormat; 14 15 public class MyHiveInputFormat extends TextInputFormat implements JobConfigurable{ 16 17 @Override 18 public RecordReader<LongWritable, Text> getRecordReader( 19 InputSplit genericSplit, JobConf job, Reporter reporter) 20 throws IOException { 21 reporter.setStatus(genericSplit.toString()); 22 return new MyRecordReader((FileSplit) genericSplit, job); 23 } 24 25 }
2、仔细看看下面的方法,不解释,自己领悟。
1 package hiveStream; 2 3 import java.io.IOException; 4 import java.io.InputStream; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.FSDataInputStream; 8 import org.apache.hadoop.fs.FileSystem; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.LongWritable; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.io.compress.CompressionCodec; 13 import org.apache.hadoop.io.compress.CompressionCodecFactory; 14 import org.apache.hadoop.mapred.FileSplit; 15 import org.apache.hadoop.mapred.RecordReader; 16 import org.apache.hadoop.util.LineReader; 17 18 19 public class MyRecordReader implements RecordReader<LongWritable, Text>{ 20 21 private CompressionCodecFactory compressionCodecs = null; 22 private long start; 23 private long pos; 24 private long end; 25 private LineReader lineReader; 26 int maxLineLength; 27 28 public MyRecordReader(InputStream in, long offset, long endOffset, 29 int maxLineLength) { 30 this.maxLineLength = maxLineLength; 31 this.start = offset; 32 this.lineReader = new LineReader(in); 33 this.pos = offset; 34 this.end = endOffset; 35 } 36 37 public MyRecordReader(InputStream in, long offset, long endOffset, 38 Configuration job) throws IOException { 39 this.maxLineLength = job.getInt( 40 "mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE); 41 this.lineReader = new LineReader(in, job); 42 this.start = offset; 43 this.end = endOffset; 44 } 45 46 // 构造方法 47 public MyRecordReader(FileSplit inputSplit, Configuration job) 48 throws IOException { 49 maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength", 50 Integer.MAX_VALUE); 51 start = inputSplit.getStart(); 52 end = start + inputSplit.getLength(); 53 final Path file = inputSplit.getPath(); 54 // 创建压缩器 55 compressionCodecs = new CompressionCodecFactory(job); 56 final CompressionCodec codec = compressionCodecs.getCodec(file); 57 // 打开文件系统 58 FileSystem fs = file.getFileSystem(job); 59 FSDataInputStream fileIn = fs.open(file); 60 boolean skipFirstLine = false; 61 62 if (codec != null) { 63 lineReader = new LineReader(codec.createInputStream(fileIn), job); 64 end = Long.MAX_VALUE; 65 } else { 66 if (start != 0) { 67 skipFirstLine = true; 68 --start; 69 fileIn.seek(start); 70 } 71 lineReader = new LineReader(fileIn, job); 72 } 73 74 if (skipFirstLine) { 75 start += lineReader.readLine(new Text(), 0, 76 (int) Math.min((long) Integer.MAX_VALUE, end - start)); 77 } 78 this.pos = start; 79 } 80 81 @Override 82 public void close() throws IOException { 83 if (lineReader != null) 84 lineReader.close(); 85 } 86 87 @Override 88 public LongWritable createKey() { 89 return new LongWritable(); 90 } 91 92 @Override 93 public Text createValue() { 94 return new Text(); 95 } 96 97 @Override 98 public long getPos() throws IOException { 99 return pos; 100 } 101 102 @Override 103 public float getProgress() throws IOException { 104 if (start == end) { 105 return 0.0f; 106 } else { 107 return Math.min(1.0f, (pos - start) / (float) (end - start)); 108 } 109 } 110 111 @Override 112 public boolean next(LongWritable key, Text value) throws IOException { 113 while (pos < end) { 114 key.set(pos); 115 int newSize = lineReader.readLine(value, maxLineLength, 116 Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), 117 maxLineLength)); 118 // 把字符串中的"##"转变为"#" 119 String strReplace = value.toString().replaceAll("\s+", " 01"); 120 Text txtReplace = new Text(); 121 txtReplace.set(strReplace); 122 value.set(txtReplace.getBytes(), 0, txtReplace.getLength()); 123 if (newSize == 0) 124 return false; 125 pos += newSize; 126 if (newSize < maxLineLength) 127 return true; 128 } 129 return false; 130 } 131 }
3、处理实例:如下
1 数据处理要求: 2 3 12 afd fewf fewfe we 4 76 vee ppt wfew wefw 5 83 tyutr ppt wfew wefw 6 45 vbe ppt wfew wefw 7 565 wee ppt wfew wefw 8 12 sde ppt wfew wefw 9 注意:字段之间的空格不一致 10 11 1、建表: 12 create table micmiu_blog(author int, category string, url string,town string,oop string) stored as inputformat 'hiveStream.MyHiveInputFormat' outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'; 13 注意:输出咱可没有重写哦 14 15 2、加载数据: 16 LOAD DATA LOCAL INPATH'/mnt/test' OVERWRITE INTO TABLE micmiu_blog; 17 18 3、看看的成果: 19 select * from micmiu_blog; 20 21 自己去试试,不解释