一、需求前提
小张同学从别处整来了一批日志,日志内容主要记录了每个用户在运营商中所使用过的手机号,如果没有则为空,数据大概是用户账号、电信手机号、联通手机号、移动手机号、铁通手机号、小灵通等等。当然还有一个关联关系静态文件,主要记录用户所在的省和地市,数据是用户账号,所在省,所在市。日志和静态文件都存在hdfs中。现在小张同学想要分析每个省每个地市有电信、联通、移动、铁通、小灵通手机号的用户各有多少?
二、需求分析
首先我们要在mr中load进来这个关联关系静态文件,然后日志数据可能会有重复,我们需要对用户层面上进行去重,最后进行分类统计。
三、伪代码实现
public class TestMapReduce extends Configured implements Tool {
public static void main(String[] args) {
Configuration conf = new Configuration();
int result = ToolRunner.run(conf, new TestMapReduce(), args)
System.exit(result);
}
@Override
public int run(String[] args) throws Exception{
Configuration conf = getConf();
conf.set("map","静态文件路径");
//FileSystem hdfs = FileSystem.get(conf);
Job job = Job.getInstance(conf,"testJob");
job.setJarByClass(TestMapReduce.class);
//指定输入目录
FileInputFormat.setInputPaths(job,new Path(filePath));
//MultipleInputs.addInputPath(job,filePath,SequenceFileInputFormat.class,MyMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);//最好自己实现一个自定义数据类型,实现方式后面会提
job.setCombinerClass(MyCombiner.class);
job.setNumReduceTasks(1);//因为我们要统计用户总量,设置reduce任务数量来限制输出结果,也不会出现用户统计的混乱
job.setReducerClass(MyReducer.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileOutputFormat.setOutputPath(job,outputPath);
if(!job.waitForCompletion(true)){
return 1;
}
return 0;
}
public static class MyMapper extends Mapper<LongWritable,Text,Text,Text> {
private Map<String,String> map = new HashMap<>();
//此方法被MapReduce框架仅且执行一次,此前提是只有一个reduce任务下,执行次数根据job.setNumReduceTasks(1)所设置的reduce任务数决定
@Override
protected void setup(Context context) throws IOException, InterruptedException{
super.setup(context);
Configuration conf = context.getConfiguration();
map.clear();
String mapInput = conf.get("map","");
if(!mapInput.isEmpty()){
Path mapInputPath = new Path(mapInput);
if(存在该路径){
for(取出文件内容){
map.put(用户账号,省+市);
}
}
}
}
@Override
protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
取到日志的每一行来判断是否具有各运营商的手机号,如果有设为1,没有设为0
context.write(省+市+用户账号,[1,0,1,1,0]);
}
public static class MyCombiner extends Reducer<Text,Text,Text,Text> {
@Override
protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
现在每个省每个市每个用户的数据分在一组,遍历values;
对值进行加和(大于等于1代表有该运营商的手机号)
context.write(省+市+用户账号,[2,4,8,0,4]);
}
}
public static class RadiusCountReducer extends Reducer<Text,Text,Text,Text> {
private Map<String,String> countsMap = new HashMap<>();
@Override
protected void setup(Context context) throws IOException, InterruptedException{
counts.clear();
}
@Override
protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
long[] counts = new long[5];
for(Text value : values) {
for(int i=0; i<5; i++){
counts[i] += 对应的值;
}
}
从key中取出省+市
countsMap.put(省+市,counts[0]+...counts[5]);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException{
for(Map.Entry<String,String> entry: countsMap.entrySet()){
context.write(Text.set(entry.getKey()),Text.set(countsMap.getValue()));
}
}
}
}
}
至此,大功告成,上面的代码只是写了大概的思路,具体的东西可以自己尝试写一下。
至于上面提到的自定义类型,只要实现WritableComparable接口,重写一些方法,可能需要实现的有write、readFields、compareTo、equals、hashCode、toString等,其他的好像没啥必要,看你自己的需求。
除了自定义类型,你也可以自定义shuffle阶段的排序和分组,以后有机会再说。