前面我们所写mr程序的输入都是文本文件,但真正工作中我们难免会碰到需要处理其它格式的情况,下面以处理excel数据为例
1、项目需求
有刘超与家庭成员之间的通话记录一份,存储在Excel文件中,如下面的数据集所示。我们需要基于这份数据,统计每个月每个家庭成员给自己打电话的次数,并按月份输出到不同文件
下面是部分数据,数据格式:编号 联系人 电话 时间
2、分析
统计每个月每个家庭成员给自己打电话的次数这一点很简单,我们之前已经写过几个这样的程序。实现需求的麻烦点在于文件的输入是Excel文件,输出要按月份输出到不同文件。这就要我们实现自定义的InputFormat和OutputFormat
3、实现
首先,输入文件是Excel格式,我们可以借助poi来解析Excel文件,我们先来实现一个Excel的解析类(ExcelParser)
package com.buaa; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.poi.hssf.usermodel.HSSFSheet; import org.apache.poi.hssf.usermodel.HSSFWorkbook; import org.apache.poi.ss.usermodel.Cell; import org.apache.poi.ss.usermodel.Row; /** * @ProjectName HandleExcelPhone * @PackageName com.buaa * @ClassName ExcelParser * @Description 解析excel * @Author 刘吉超 * @Date 2016-04-24 16:59:28 */ public class ExcelParser { private static final Log logger = LogFactory.getLog(ExcelParser.class); /** * 解析is * * @param is 数据源 * @return String[] */ public static String[] parseExcelData(InputStream is) { // 结果集 List<String> resultList = new ArrayList<String>(); try { // 获取Workbook HSSFWorkbook workbook = new HSSFWorkbook(is); // 获取sheet HSSFSheet sheet = workbook.getSheetAt(0); Iterator<Row> rowIterator = sheet.iterator(); while (rowIterator.hasNext()) { // 行 Row row = rowIterator.next(); // 字符串 StringBuilder rowString = new StringBuilder(); Iterator<Cell> colIterator = row.cellIterator(); while (colIterator.hasNext()) { Cell cell = colIterator.next(); switch (cell.getCellType()) { case Cell.CELL_TYPE_BOOLEAN: rowString.append(cell.getBooleanCellValue() + " "); break; case Cell.CELL_TYPE_NUMERIC: rowString.append(cell.getNumericCellValue() + " "); break; case Cell.CELL_TYPE_STRING: rowString.append(cell.getStringCellValue() + " "); break; } } resultList.add(rowString.toString()); } } catch (IOException e) { logger.error("IO Exception : File not found " + e); } return resultList.toArray(new String[0]); } }
然后,我们需要定义一个从Excel读取数据的InputFormat类,命名为ExcelInputFormat,实现代码如下
package com.buaa; import java.io.IOException; import java.io.InputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** * @ProjectName HandleExcelPhone * @PackageName com.buaa * @ClassName ExcelInputFormat * @Description TODO * @Author 刘吉超 * @Date 2016-04-28 17:31:54 */ public class ExcelInputFormat extends FileInputFormat<LongWritable,Text>{ @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new ExcelRecordReader(); } public class ExcelRecordReader extends RecordReader<LongWritable, Text> { private LongWritable key = new LongWritable(-1); private Text value = new Text(); private InputStream inputStream; private String[] strArrayofLines; @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { // 分片 FileSplit split = (FileSplit) genericSplit; // 获取配置 Configuration job = context.getConfiguration(); // 分片路径 Path filePath = split.getPath(); FileSystem fileSystem = filePath.getFileSystem(job); inputStream = fileSystem.open(split.getPath()); // 调用解析excel方法 this.strArrayofLines = ExcelParser.parseExcelData(inputStream); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { int pos = (int) key.get() + 1; if (pos < strArrayofLines.length){ if(strArrayofLines[pos] != null){ key.set(pos); value.set(strArrayofLines[pos]); return true; } } return false; } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { if (inputStream != null) { inputStream.close(); } } } }
接下来,我们要定义一个ExcelOutputFormat类,用于实现按月份输出到不同文件中
package com.buaa; import java.io.DataOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; /** * @ProjectName HandleExcelPhone * @PackageName com.buaa * @ClassName ExcelOutputFormat * @Description TODO * @Author 刘吉超 * @Date 2016-04-28 17:24:23 */ public class ExcelOutputFormat extends FileOutputFormat<Text,Text> { // MultiRecordWriter对象 private MultiRecordWriter writer = null; @Override public RecordWriter<Text,Text> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job, getTaskOutputPath(job)); } return writer; } private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("没有定义输出目录"); } workPath = outputPath; } return workPath; } /** * 通过key, value, conf来确定输出文件名(含扩展名) * * @param key * @param value * @param conf * @return String */ protected String generateFileNameForKeyValue(Text key, Text value, Configuration conf){ // name + month String[] records = key.toString().split(" "); return records[1] + ".txt"; } /** * 定义MultiRecordWriter */ public class MultiRecordWriter extends RecordWriter<Text,Text> { // RecordWriter的缓存 private HashMap<String, RecordWriter<Text,Text>> recordWriters = null; // TaskAttemptContext private TaskAttemptContext job = null; // 输出目录 private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; this.recordWriters = new HashMap<String, RecordWriter<Text,Text>>(); } @Override public void write(Text key, Text value) throws IOException, InterruptedException { // 得到输出文件名 String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration()); RecordWriter<Text,Text> rw = this.recordWriters.get(baseName); if (rw == null) { rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw); } rw.write(key, value); } private RecordWriter<Text,Text> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); //key value 分隔符 String keyValueSeparator = " "; RecordWriter<Text,Text> recordWriter = null; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension()); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new MailRecordWriter<Text,Text>(new DataOutputStream(codec .createOutputStream(fileOut)), keyValueSeparator); } else { Path file = new Path(workPath, baseName); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new MailRecordWriter<Text,Text>(fileOut, keyValueSeparator); } return recordWriter; } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { Iterator<RecordWriter<Text,Text>> values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } } }
package com.buaa; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; /** * @ProjectName HandleExcelPhone * @PackageName com.buaa * @ClassName MailRecordWriter * @Description TODO * @Author 刘吉超 * @Date 2016-04-24 16:59:23 */ public class MailRecordWriter< K, V > extends RecordWriter< K, V > { // 编码 private static final String utf8 = "UTF-8"; // 换行 private static final byte[] newline; static { try { newline = " ".getBytes(utf8);//换行符 "/n"不对 } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } // 输出数据 protected DataOutputStream out; // 分隔符 private final byte[] keyValueSeparator; public MailRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public MailRecordWriter(DataOutputStream out) { this(out, "/t"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } }
最后我们来编写Mapper类,实现 map() 函数;编写Reduce类,实现reduce函数;还有一些运行代码
package com.buaa; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @ProjectName HandleExcelPhone * @PackageName com.buaa * @ClassName ExcelContactCount * @Description TODO * @Author 刘吉超 * @Date 2016-04-24 16:34:24 */ public class ExcelContactCount extends Configured implements Tool { public static class PhoneMapper extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws InterruptedException, IOException { Text pkey = new Text(); Text pvalue = new Text(); // 1.0, 老爸, 13999123786, 2014-12-20 String line = value.toString(); String[] records = line.split("\s+"); // 获取月份 String[] months = records[3].split("-"); // 昵称+月份 pkey.set(records[1] + " " + months[1]); // 手机号 pvalue.set(records[2]); context.write(pkey, pvalue); } } public static class PhoneReducer extends Reducer<Text, Text, Text, Text> { protected void reduce(Text Key, Iterable<Text> Values, Context context) throws IOException, InterruptedException { Text phone = Values.iterator().next(); int phoneToal = 0; for(java.util.Iterator<Text> its = Values.iterator();its.hasNext();its.next()){ phoneToal++; } Text pvalue = new Text(phone + " " + phoneToal); context.write(Key, pvalue); } } @Override @SuppressWarnings("deprecation") public int run(String[] args) throws Exception { // 读取配置文件 Configuration conf = new Configuration(); // 判断输出路径,如果存在,则删除 Path mypath = new Path(args[1]); FileSystem hdfs = mypath.getFileSystem(conf); if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } // 新建任务 Job job = new Job(conf,"Call Log"); job.setJarByClass(ExcelContactCount.class); // 输入路径 FileInputFormat.addInputPath(job, new Path(args[0])); // 输出路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // Mapper job.setMapperClass(PhoneMapper.class); // Reduce job.setReducerClass(PhoneReducer.class); // 输出key类型 job.setOutputKeyClass(Text.class); // 输出value类型 job.setOutputValueClass(Text.class); // 自定义输入格式 job.setInputFormatClass(ExcelInputFormat.class); // 自定义输出格式 job.setOutputFormatClass(ExcelOutputFormat.class); return job.waitForCompletion(true) ? 0:1; } public static void main(String[] args) throws Exception { String[] args0 = { "hdfs://ljc:9000/buaa/phone/phone.xls", "hdfs://ljc:9000/buaa/phone/out/" }; int ec = ToolRunner.run(new Configuration(), new ExcelContactCount(), args0); System.exit(ec); } }
4、结果
通过这份数据很容易看出,刘超1月份与姐姐通话次数最多,19次