前面我们介绍了MapReduce中的Join算法,我们提到了可以通过map端连接或reduce端连接实现join算法,在文章中,我们只给出了reduce端连接的例子,下面我们说说使用map端连接结合分布式缓存机制实现Join算法
1、介绍
我们使用频道类型数据集和机顶盒用户数据集,进行连接,统计出每天、每个频道、每分钟的收视人数
2、数据集
频道类型数据集就是channelType.csv文件,如下示例
机顶盒用户数据集来源于“08.统计电视机顶盒中无效用户数据,并以压缩格式输出有效数据”这个实战项目处理后的结果,数据集如下所示
3、分析
基于项目的需求,我们通过以下几步完成:
1、编写Mapper类,连接用户数据和频道类型数据,按需求将数据解析为key=频道类别+日期+每分钟,value=机顶盒号,然后将结果输出。
2、编写Combiner类,先将Mapper输出结果合并一次,然后输出给Reducer。
3、编写Reducer类,统计出收视率,然后使用MultipleOutputs类将每分钟的收视率,按天输出到不同文件路径下
4、编写驱动方法 run,运行MapReduce程序
4、实现
1、编写Mapper、Reducer
package com.buaa; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.Hashtable; import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @ProjectName CountViewers * @PackageName com.buaa * @ClassName CountViews * @Description 通过map端连接,最后统计出 每天 每个类别 每分钟的收视人数 并按天分别输出不同的文件下 * @Author 刘吉超 * @Date 2016-06-01 16:12:08 */ @SuppressWarnings("deprecation") public class CountViews extends Configured implements Tool { /* * 解析tv用户数据 */ public static class ViewsMapper extends Mapper<LongWritable, Text, Text, MapWritable> { // 定义全局 Hashtable 对象 private Hashtable<String, String> table = new Hashtable<String, String>(); @Override protected void setup(Context context) throws IOException, InterruptedException { // 返回本地文件路径 Path[] localPaths = (Path[]) context.getLocalCacheFiles(); if (localPaths.length == 0) { throw new FileNotFoundException("Distributed cache file not found."); } // 获取本地 FileSystem实例 FileSystem fs = FileSystem.getLocal(context.getConfiguration()); // 打开输入流 FSDataInputStream in = fs.open(new Path(localPaths[0].toString())); // 创建BufferedReader读取器 BufferedReader br = new BufferedReader(new InputStreamReader(in)); String infoAddr = null; // 按行读取文件 while (null != (infoAddr = br.readLine())) { // 将每行数据解析成数组 records String[] records = infoAddr.split(" "); /* * records[0]为频道名称,records[1]为频道类别 * 世界地理 4 */ table.put(records[0], records[1]); } } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /* * 数据格式:机顶盒 + "@" + 日期 + "@" + 频道名称 + "@" + 开始时间+ "@" + 结束时间 * 01050908200002327@2012-09-17@CCTV-1 综合@02:21:03@02:21:06 */ String[] records = value.toString().split("@"); // 机顶盒 String stbNum = records[0]; // 日期 String date = records[1]; // 频道名称 String sn = records[2]; // 开始时间 String s = records[3]; // 结束时间 String e = records[4]; // 如果开始时间或结束时间为空,直接返回 if(StringUtils.isEmpty(s) || StringUtils.isEmpty(e)){ return ; } // 按每条记录的起始时间、结束时间 计算出分钟列表List List<String> list = ParseTime.getTimeSplit(s, e); if(list == null){ return; } // 频道类别 String channelType = StringUtils.defaultString(table.get(sn),"0"); // 循环所有分钟,拆分数据记录并输出 for (String min : list) { MapWritable avgnumMap = new MapWritable(); avgnumMap.put(new Text(stbNum), new Text()); /* * 0@2012-09-17@02:59 */ context.write(new Text(channelType + "@" + date+ "@" + min), avgnumMap); } } } /* * 定义Combiner,合并 Mapper 输出结果 */ public static class ViewsCombiner extends Reducer<Text, MapWritable, Text, MapWritable> { protected void reduce(Text key, Iterable<MapWritable> values,Context context) throws IOException, InterruptedException { MapWritable avgnumMap = new MapWritable(); for (MapWritable val : values) { // 合并相同的机顶盒号 avgnumMap.putAll(val); } context.write(key, avgnumMap); } } /* * 统计每个频道类别,每分钟的收视人数,然后按日期输出到不同文件路径下 */ public static class ViewsReduce extends Reducer<Text, MapWritable, Text, Text> { // 声明多路径输出对象 private MultipleOutputs<Text, Text> mos; protected void setup(Context context) throws IOException,InterruptedException { mos = new MultipleOutputs<Text, Text>(context); } protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException { // 数据格式:key=channelType+date+min value=map(stbNum) String[] kv = key.toString().split("@"); // 频道类别 String channelType = kv[0]; // 日期 String date = kv[1]; // 分钟 String min = kv[2]; MapWritable avgnumMap = new MapWritable(); for (MapWritable m : values) { avgnumMap.putAll(m); } // 按日期将数据输出到不同文件路径下 mos.write(new Text(channelType), new Text(min + " " + avgnumMap.size()), date.replaceAll("-", "")); } protected void cleanup(Context context) throws IOException, InterruptedException { mos.close(); } } @Override public int run(String[] arg) throws Exception { // 读取配置文件 Configuration conf = new Configuration(); // 判断路径是否存在,如果存在,则删除 Path mypath = new Path(arg[1]); FileSystem hdfs = mypath.getFileSystem(conf); if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } Job job = Job.getInstance(conf,"CountViews"); // 设置主类 job.setJarByClass(CountViews.class); // 输入路径 FileInputFormat.addInputPaths(job, arg[0]+"20120917,"+arg[0]+"20120918,"+arg[0]+ "20120919,"+arg[0]+"20120920,"+arg[0]+"20120921,"+arg[0]+"20120922,"+arg[0]+"20120923"); // 输出路径 FileOutputFormat.setOutputPath(job, new Path(arg[1])); // 去part-r-00000空文件 LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); // Mapper job.setMapperClass(ViewsMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MapWritable.class); // 设置Combiner job.setCombinerClass(ViewsCombiner.class); // Reducer job.setReducerClass(ViewsReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 指定分布式缓存文件 job.addCacheFile(new URI(arg[2])); //提交任务 return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { String[] arg = { "hdfs://hadoop1:9000/buaa/tv/out/", "hdfs://hadoop1:9000/buaa/ctype/", "hdfs://hadoop1:9000/buaa/channel/channelType.csv" }; int ec = ToolRunner.run(new Configuration(), new CountViews(), arg); System.exit(ec); } }
2、提取开始时间~结束时间之间的分钟数
package com.buaa; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; import java.util.List; /** * @ProjectName CountViewers * @PackageName com.buaa * @ClassName ParseTime * @Description TODO * @Author 刘吉超 * @Date 2016-06-01 16:11:10 */ public class ParseTime { /** * 提取start~end之间的分钟数 * * @param start * @param end * @return List */ public static List<String> getTimeSplit(String start, String end) { List<String> list = new ArrayList<String>(); // SimpleDateFormat SimpleDateFormat formatDate = new SimpleDateFormat("HH:mm"); SimpleDateFormat parseDate = new SimpleDateFormat("HH:mm:ss"); /* * 开始时间格式:02:21:03 */ Calendar startCalendar = Calendar.getInstance(); /* * 结束时间格式:02:21:06 */ Calendar endCalendar = Calendar.getInstance(); try { startCalendar.setTime(parseDate.parse(start)); endCalendar.setTime(parseDate.parse(end)); } catch (ParseException e1) { return null; } while (startCalendar.compareTo(endCalendar) <= 0) { list.add(formatDate.format(startCalendar.getTime())); startCalendar.add(Calendar.MINUTE, 1); } return list; } public static void main(String[] args) { String start = "12:59:24"; String end = "13:03:45"; List<String> list1 = getTimeSplit(start, end); for(String st1 : list1){ System.out.println(st1); } } }
5、运行结果
如果,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。
如果,您希望更容易地发现我的新博客,不妨点击一下左下角的【关注我】。
如果,您对我的博客所讲述的内容有兴趣,请继续关注我的后续博客,我是【刘超★ljc】。
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。