zoukankan      html  css  js  c++  java
  • MR-join连接

    package com.bw.mr;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class MapJoin {
        // 两个文件都在内存中 一个机器的内存中
        // 通过集合放入一个机器的内存中
        // 通过map任务放入内存中
        // join连接共有的属性连接
        public static class JMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
            // 将user放入内存中
            private Map<String, String> map = new HashMap<String, String>();
    
            // 重新setup方法 运行在map前面
            @Override
            protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
                    throws IOException, InterruptedException {
                // 创建连接
                FileSystem fs = FileSystem.get(context.getConfiguration());
                // user01.txt hdfs 拉取信息 到内存map
                FSDataInputStream in = fs.open(new Path("hdfs://linux04:9000/user.txt"));
                // 读取 BufferedReader 读取器
                BufferedReader bf = new BufferedReader(new InputStreamReader(in));
                // 一行一行读取
                String line = null;
                while ((line = bf.readLine()) != null) {
                    String[] split = line.split(" ");
                    map.put(split[0], split[1] + " " + split[2] + " " + split[3]);
                }
                bf.close();
                in.close();
            }
            @Override
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
                    throws IOException, InterruptedException {
                String[] strs = value.toString().split(" ");
                String orderId = strs[0];
                String uid = strs[1];
                String money = strs[2];
                //join 的uid 关联】
                String productInfo = map.get(uid);
                context.write(new Text(orderId+" "+uid+" "+productInfo), NullWritable.get());
            }
        }
        public static void main(String[] args) throws Exception {
            Configuration conf =new  Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(MapJoin.class);
            job.setMapperClass(JMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);
            job.setNumReduceTasks(0);
            FileInputFormat.addInputPath(job,new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            job.waitForCompletion(true);
            
        }
    }
  • 相关阅读:
    jquery水印插件:placeholder
    EF POWER TOOLS由数据库逆向CODE FIRST
    .NET重构(类型码的设计、重构方法)
    CodeUI Test:创建第一个CodeUI Test
    Windows 8 Store Apps
    ASP.NET MVC 使用MSBuild部署的几个注意事项
    c#中如何跨线程调用windows窗体控件
    Restful?
    Javascript的一种代码结构方式——插件式
    AOP编程
  • 原文地址:https://www.cnblogs.com/JBLi/p/10765134.html
Copyright © 2011-2022 走看看