背景
公司的物流业务系统目前实现了使用storm集群进行过门事件的实时计算处理,但是还有一个需求,我们需要存储每个标签上传的每条明细数据,然后进行定期的标签报表统计,这个是目前的实时计算框架无法满足的,需要考虑离线存储和计算引擎。
标签的数据量是巨大的,此时存储在mysql中是不合适的,所以我们考虑了分布式存储系统HDFS。目前考虑的架构是,把每条明细数据存储到HDFS中,利用Hive或者其他类SQL的解析引擎,定期进行离线统计计算。
查找相关资料后,我下载了深入理解Haddoop这本书,从大数据的一些基础原理开始调研,这一系列的笔记就是调研笔记。
系列文章:
深入理解Hadoop-类比SQL的MapReduce开发模式
书中以一个航空公司的数据集来解释相关概念,原本我是想把数据下载下来,然后对照着书中的例子实际编写代码测试,但是下载链接中无法下载数据,只能作罢。
我会自己写一些数据,然后仿照书中的例子实际编写代码测试,代码也会放在下面。
下面按照书中的逻辑,通过类比SQL来学习MapReduce不同类型业务需求的开发。
Select
关于select的查询数据的需求,一般不需要使用reduce任务,只需要使用map任务即可。在map任务中,我们需要对读入的每行数据做一些数据处理,然后输出。只包含Map阶段的任务和同时包含map阶段和reduce阶段的任务相比,因为不包含sort/shuffle阶段(这个阶段负责把Map阶段输出的结果通过网络输入到reduce任务中),所以执行速度会快一些。
要点如下:
-
如果输入和输出都是文件,并且逐行读入和逐行写出,那么使用TextInputFormat.class和TextOutputFormat.class即可
-
如果不需要键,只需要值,可以把输出值的类型设置为NullWritable
-
需要人为指定 map(reduce,如果有的话)的输入和输出的键值类型 和 最终结果的输出键值类型,因为Java代码的泛型在实际执行时会擦除
- 如果使用TextInputFormat,那么map的输入键值对类型就为 LongWritable和Text类型,默认情况下map的输出类型和输入类型一致,如果修改,需要单独配置
- 如果使用TextInputFormat,默认把键值输出为字符串,中间用制表符分割,如果键类型为NullWritable,则只输出值。
-
如果只有map阶段,需要编写下面这行代码,如果不写,默认会建立一个reduce,会导致额外的网络传输降低性能。
job.setNumReduceTasks(0)
-
有几个输入文件,就对应有几个输出文件,可以使用tail 查看文件的后几行。
聚合
类似Group by sum max等业务需求,如何使用MapReduce来开发呢?
比如,统计某个时间段内的某种业务数据的总次数和某些比例数据(某业务的次数/总次数),这种需求是很普遍的。
要点如下:
- 统计不同的业务的比例,最好先在map中实现一部分计算,把需要统计的不同业务转换为字段较短的标识,然后传递给reduce,可以大大减少网络传输
- 在reduce中,使用上面定义的标识来分别编写代码计算处理总数和比例
下面举一个具体的例子
业务原始数据,所有数据按照#分割
1573fe8955e64a3c8f97af674d0ad409#2020-07-30 11:34:59#1111#1#1#39
17ad961f25d340f18ebd60d6d0de7b57#2020-08-04 17:32:25#1111#1#3#39
1a6da9e15d2a4abba1fbe9cdf936508f#2020-07-30 15:07:07#1111#1#1#39
26d846ed92034b0fbecd271c390d9a5c#2020-08-04 16:48:13#1111#1#3#39
2b8a6a9d2845462f9bbf3cff06cb9ff7#2020-08-04 17:24:45#1111#1#3#39
38bd8f1756fe4181bc0606ac1871329d#2020-08-04 16:49:56#1111#2#1#39
3cba6ffa8922481e828c7005210fb68a#2020-07-30 11:36:19#1111#2#1#39
4e30b5863bed442f9d0f4a29d95967f2#2020-07-30 15:07:26#1111#2#1#39
570f98bef1c84b11a1868376318f9509#2020-08-04 17:24:25#1111#1#1#39
59c9d98e5e244078aa4974bb1423ce76#2020-08-04 17:29:05#1111#1#3#39
63c997ff330040119ac3fe773814134d#2020-07-30 15:06:27#1111#1#3#39
6d77b76646314c419efd164dd1d477c6#2020-08-04 16:48:13#1111#1#1#39
81954b10c702480392d5d17cbd05fc97#2020-07-30 14:15:27#1111#1#2#39
9781f0e163ec4e8a85f800f63ddd7127#2020-08-04 16:50:25#1111#1#1#39
b1696070c5214bc19c81c51b8a38838c#2020-07-30 11:34:20#1111#2#1#39
b3ef5de5432a4aefb58c885cd844d45b#2020-07-30 11:30:54#1111#1#1#39
c1ff40e93aa04c1285104a53a5066be5#2020-08-04 17:29:36#1111#1#3#39
dada5853f0e44dc1bf195f04766fa3b4#2020-07-30 14:47:47#1111#2#1#39
f617de93fa8345ccac1a5f99a836f8d5#2020-08-04 17:24:15#1111#2#1#39
f96831bbe1b24b3f927519b915d1f72a#2020-07-30 11:37:19#1111#1#1#40
字段的解释如下:
我们现在需要按照统计每个月的报警总次数,非法上架次数,非法下架的次数,非法上架比例,非法下架比例这些数据
代码如下:
package org.example.sql;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.text.DecimalFormat;
import java.util.Iterator;
/**
* only have map !
*/
public class GroupBySample {
private static final IntWritable RECORD = new IntWritable(0);
private static final IntWritable UP = new IntWritable(1);
private static final IntWritable DOWN = new IntWritable(2);
private static final IntWritable OTHER = new IntWritable(3);
private static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] strings = value.toString().split("#");
String[] dates = strings[1].split("-");
String month = dates[1];
context.write(new Text(month), RECORD);
IntWritable content = new IntWritable(Integer.parseInt(strings[3]));
if (UP.equals(content)) {
context.write(new Text(month), UP);
} else if (DOWN.equals(content)) {
context.write(new Text(month), DOWN);
} else {
context.write(new Text(month), OTHER);
}
}
}
private static class MyReducer extends Reducer<Text, IntWritable, NullWritable, Text> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
int up = 0;
int down = 0;
int other = 0;
Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()) {
IntWritable next = iterator.next();
if (RECORD.equals(next)) {
sum++;
}
if (UP.equals(next)) {
up++;
}
if (DOWN.equals(next)) {
down++;
}
if (OTHER.equals(next)) {
other++;
}
}
//calculate
DecimalFormat decimalFormat = new DecimalFormat("0.0000");
StringBuilder sb = new StringBuilder(key.toString());
sb.append(",").append(sum);
sb.append(",").append(up);
sb.append(",").append(down);
sb.append(",").append(decimalFormat.format((double) up / sum));
sb.append(",").append(decimalFormat.format((double)down / sum));
sb.append(",").append(decimalFormat.format((double)other / sum));
context.write(NullWritable.get(), new Text(sb.toString()));
}
}
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "ging");
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://192.168.202.129:9000");
Job job = Job.getInstance(configuration);
job.setJarByClass(GroupBySample.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
// //增加combiner操作
// job.setCombinerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.202.129:9000"), configuration, "ging");
Path output = new Path("/hdfs/test/output");
if (fileSystem.exists(output)) {
fileSystem.delete(output, true);
}
FileInputFormat.setInputPaths(job, new Path("/hdfs/test/input/groupby"));
FileOutputFormat.setOutputPath(job, output);
boolean result = job.waitForCompletion(true);
System.out.println(result);
}
}
输出结果如下:
ging@ubuntu:~/hadoop/hadoop-2.10.0$ bin/hdfs dfs -text /hdfs/test/output/part-r-00000
07,10,6,4,0.6000,0.4000,0.0000
08,10,8,2,0.8000,0.2000,0.0000
可以看到,有两行数据,分别代码7月和8月的相关数据。
分区
分区的作用是,当数据通过mapper经过业务处理,传递出来一个个键值对后,Hadoop框架会调用默认的partitioner把键分配到reducer中进行处理,默认的分区器是hash分区器,可以保证同样的键,一定被同一个reducer处理。下面使用前一部分讲解的业务例子来帮助大家进一步理解分区的概念。
上面的例子中只使用了一个reducer,所有的键值对最终都会分配到同一个reducer处理。如果现在想加快处理速度(会使用2个reducer),并且要一个reducer只处理奇数月份或者偶数月份的数据,那么就需要实现自定义的partitioner来进行分区。如果不自定义实现,就无法保证一个reducer只处理一个key的业务需求,同一个reducer可能处理多个key的数据。
增加代码如下:
main方法
job.setPartitionerClass(MyPartioner.class);
job.setNumReduceTasks(2);
自定义Partioner
/**
* 2 reducer
*/
private static class MyPartioner extends Partitioner<Text, MapWritable> {
@Override
public int getPartition(Text text, MapWritable mapWritable, int i) {
String month = text.toString();
return Integer.parseInt(month) % 2;
}
}
跑完结果如下,在output文件夹下生成了两个结果文件,分别查看内容,发现7月和8月的数据已经成功分离出来。
ging@ubuntu:~/hadoop/hadoop-2.10.0$ bin/hdfs dfs -ls /hdfs/test/output
Found 3 items
-rw-r--r-- 3 ging supergroup 0 2020-09-21 19:21 /hdfs/test/output/_SUCCESS
-rw-r--r-- 3 ging supergroup 31 2020-09-21 19:21 /hdfs/test/output/part-r-00000
-rw-r--r-- 3 ging supergroup 31 2020-09-21 19:21 /hdfs/test/output/part-r-00001
ging@ubuntu:~/hadoop/hadoop-2.10.0$ bin/hdfs dfs -text /hdfs/test/output/part-r-00000
08,10,8,2,0.8000,0.2000,0.0000
ging@ubuntu:~/hadoop/hadoop-2.10.0$ bin/hdfs dfs -text /hdfs/test/output/part-r-00001
07,10,6,4,0.6000,0.4000,0.0000
总结
这一部分通过结合实际的业务场景,编写了MR代码,实现了基本的功能,这部分的目的就达到了,了解完这部分,我们可以基本写出一个不错误的MR程序。