zoukankan      html  css  js  c++  java
  • mapreduce的join

    一篇超级详细的文章:this one

    读完之后感触颇深,什么时候在map的时候join什么时候在reducer的时候join

    之前写两个输入的时候,写的多么可笑,效率极低。

    先用了一遍这篇文章的分布式缓存,在reduce的时候读取,因为hadoop版本太低,所以又做了修改,结合好几篇文章结果:

    版本:Hadoop0.20.203.0

    package bjut.edu.ting;
    
    import java.io.BufferedReader;
    import java.io.FileNotFoundException;
    import java.io.FileReader;
    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Hashtable;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.filecache.DistributedCache;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import com.opencsv.CSVParser;
    
    //有两个输入:GPS(数据量大),Dictionary(数据量小);
    //两者通过属性bus_line在reduce过程中连接,将dictonary放在内存,读取之时用hashtable,存储检索
    //其中mapper过程通过passtime计算date,并赋值给GPS数据
    public class DateLineJob{ public static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{ //处理GPS数据 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ CSVParser parser = new CSVParser(); String[] gpsData = parser.parseLine(value.toString()); Integer date_label=null; try { date_label=getDateStamp(gpsData[2]); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } if(date_label==12&&date_label!=-1){//这儿的date_label需要修改 String outValue=date_label.toString()+","+gpsData[0]+","+gpsData[2]+","+gpsData[3]+","+gpsData[4]; //key:bus_line  value:0:date,1:vehicle,2:pass,3:lon,4:lat context.write(new Text(gpsData[1]),new Text(outValue)); } } } public static class JoinReducer extends Reducer<Text, Text, NullWritable, Text>{ //定义HashTable存放缓存数据 private Hashtable <String,String> table=new Hashtable<String,String>(); /** * 获取分布式缓存文件 */ private Path[] modelPath; private BufferedReader modelBR; protected void setup(Context context) throws IOException {
    //返回本地文件路径 Configuration conf = context.getConfiguration(); modelPath = DistributedCache.getLocalCacheFiles(conf); if(modelPath.length==0){ throw new FileNotFoundException("Distributed cache file not found"); } modelBR = new BufferedReader(new FileReader(modelPath[0].toString())); //按行读取并解析字典数据, String infoDic=null; while((infoDic=modelBR.readLine())!=null){ String[] records=infoDic.split(","); //key为bus_line value为line_code table.put(records[1],records[0]);//将相应的字段存入Hashtable里面 } modelBR.close(); } public void reduce(Text key,Iterable<Text> values, Context context) throws IOException, InterruptedException{ //字典数据根据bus_line获取line_code String line_code=table.get(key.toString());//从Hashtable中获取line_code if(line_code!=null){//有些线路在字典中没有 for(Text value:values){ String outValue=value.toString(); String[] valueData=outValue.split(","); //0:date,1:vehicle,2:pass,3:lon,4:lat String out=valueData[0]+","+line_code+","+valueData[1]+","+valueData[2]+","+valueData[3]+","+valueData[4]; context.write(null, new Text(out)); } } } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); conf.set("mapreduce.admin.reduce.child.java.opts", "-Xmx512m"); DistributedCache.addCacheFile(new Path("hdfs://172.18.49.17:8020/Anewday/line_route_dict_update08_nohead.csv").toUri(), conf); DistributedCache.createSymlink(conf); Job job = new Job(conf,"join"); job.setJarByClass(DateLineJob.class); //设置GPS作为输入 FileInputFormat.addInputPath(job,new Path(args[0])); //输出目录 FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(JoinMapper.class); job.setReducerClass(JoinReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true)?0:1); } private static int getDateStamp(String timeStr) throws ParseException{ if(timeStr.length()==19){//若不是这个形式,则不考虑 SimpleDateFormat formatter=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date data=formatter.parse(timeStr); int dateStamp=-1; dateStamp=data.getDate(); //这个地方会出现日期不存在的情况错误提示,但不影响运行,而且这样的数据量特别少。 return dateStamp; }else{//返回-1 return -1; } } }
  • 相关阅读:
    struts2下,jsp视图页面中CSS和javascript引用相对路径和绝对路径问题。
    浏览器和web开发中的编码问题
    一台电脑同时配置2个tomcat
    hibernate中“deleted object would be re-saved by cascade”异常原理和解决方案
    hibernate Unknown entity异常解决方案
    rose2003安装后,启动时提示 计算机缺少suite objects.dll解决方案
    Eclipse下构建hibernate项目流程
    Hibernate不能自动建表解决办法【转载】
    Eclipse安装hibernate插件的问题
    MyEclipse添加tomcat7出现“Value must be an existing directory”解决方案
  • 原文地址:https://www.cnblogs.com/amelie-tingting/p/6746419.html
Copyright © 2011-2022 走看看