zoukankan      html  css  js  c++  java
  • 小表 + 大表 完成用户和用户订单 内存+读取的方式

    map端join

        1.创建Mapper
            package com.mine.hdfs.mr.mapjoin;

            import org.apache.hadoop.conf.Configuration;
            import org.apache.hadoop.fs.FSDataInputStream;
            import org.apache.hadoop.fs.FileSystem;
            import org.apache.hadoop.fs.Path;
            import org.apache.hadoop.io.LongWritable;
            import org.apache.hadoop.io.NullWritable;
            import org.apache.hadoop.io.Text;
            import org.apache.hadoop.mapreduce.Mapper;

            import java.io.BufferedReader;
            import java.io.IOException;
            import java.io.InputStreamReader;
            import java.util.HashMap;
            import java.util.Map;

            /**
             * join操作,map端连接。
             */
            public class MapJoinMapper extends Mapper<LongWritable,Text,Text,NullWritable> {

                private Map<String,String> allCustomers = new HashMap<String,String>();

                //启动,初始化客户信息
                protected void setup(Context context) throws IOException, InterruptedException {
                    try {
                        Configuration conf = context.getConfiguration();
                        FileSystem fs = FileSystem.get(conf);
                        FSDataInputStream fis = fs.open(new Path("file:///d:/mr/mapjoin/customers.txt"));
                        //得到缓冲区阅读器
                        BufferedReader br = new BufferedReader(new InputStreamReader(fis));
                        String line = null ;
                        while((line = br.readLine()) != null){
                            //得到cid
                            String cid = line.substring(0,line.indexOf(","));
                            allCustomers.put(cid,line);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

                protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                    //订单信息
                    String line = value.toString();
                    //提取customer id
                    String cid = line.substring(line.lastIndexOf(",") + 1);
                    //订单信息
                    String orderInfo = line.substring(0,line.lastIndexOf(","));

                    //连接customer + "," + order
                    String customerInfo = allCustomers.get(cid);
                    context.write(new Text(customerInfo + "," + orderInfo),NullWritable.get());
                }

            }

        2.创建App
            package com.mine.hdfs.mr.mapjoin;

            import org.apache.hadoop.conf.Configuration;
            import org.apache.hadoop.fs.Path;
            import org.apache.hadoop.io.NullWritable;
            import org.apache.hadoop.io.Text;
            import org.apache.hadoop.mapreduce.Job;
            import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
            import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

            /**
             *
             */
            public class MapJoinApp {
                public static void main(String[] args) throws Exception {

                    Configuration conf = new Configuration();
                    conf.set("fs.defaultFS","file:///");
                    Job job = Job.getInstance(conf);

                    //设置job的各种属性
                    job.setJobName("MapJoinApp");                        //作业名称
                    job.setJarByClass(MapJoinApp.class);                 //搜索类

                    //添加输入路径
                    FileInputFormat.addInputPath(job,new Path(args[0]));
                    //设置输出路径
                    FileOutputFormat.setOutputPath(job,new Path(args[1]));

                    //没有reduce

                    job.setNumReduceTasks(0);

                    job.setMapperClass(MapJoinMapper.class);             //mapper类

                    job.setMapOutputKeyClass(Text.class);           //
                    job.setMapOutputValueClass(NullWritable.class);  //

                    job.waitForCompletion(true);
                }
            }


  • 相关阅读:
    Cmd Markdown 公式指导手册
    ubuntu 21.10 上的一些必备库的安装 opengl, opencv
    MarkDown 数学公式
    免费代理IP地址列表
    pyexecl的使用
    磁盘IO满负荷性能分析
    DRM 简介
    Oracle修改字符集ORA02374,ORA12899,ORA02372
    oracle grid修改ip
    Oracle性能问题一般排查方法
  • 原文地址:https://www.cnblogs.com/zyanrong/p/10753760.html
Copyright © 2011-2022 走看看