zoukankan      html  css  js  c++  java
  • [MapReduce_add_1] Windows 下开发 MapReduce 程序部署到集群


    0. 说明

       Windows 下开发 MapReduce 程序部署到集群


    1. 前提

      在本地开发的时候保证 resource 中包含以下配置文件,从集群的配置文件中拷贝

      

      在 resource 中新建 mapred-site.xml(仅做测试使用,打包的时候删掉)

    <?xml version="1.0"?>
    <configuration>
        <property>
            <name>mapreduce.framework.name</name>
            <value>yarn</value>
        </property>
    </configuration>

      在 resource 中新建 yarn-site.xml(仅做测试使用,打包的时候删掉)

    <?xml version="1.0"?>
    <configuration>
    
    <!-- Site specific YARN configuration properties -->
    
        <property>
            <name>yarn.resourcemanager.hostname</name>
            <value>s101</value>
        </property>
    
        <property>
            <name>yarn.nodemanager.aux-services</name>
            <value>mapreduce_shuffle</value>
        </property>
    
    </configuration>

    2. 代码编写 

      [2.1 WCMapper.java]

    package hadoop.mr.wc;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * Mapper 程序
     */
    public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        /**
         * map 函数,被调用过程是通过 while 循环每行调用一次
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 将 value 变为 String 格式
            String line = value.toString();
            // 将一行文本进行截串
            String[] arr = line.split(" ");
    
            for (String word : arr) {
                context.write(new Text(word), new IntWritable(1));
            }
    
        }
    }

      [2.2 WCReducer.class]

    package hadoop.mr.wc;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * Reducer 类
     */
    public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        /**
         * 通过迭代所有的 key 进行聚合
         */
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
    
            for (IntWritable value : values) {
                sum += value.get();
            }
    
            context.write(key,new IntWritable(sum));
        }
    }

      [2.3 WCApp.class]

    package hadoop.mr.wc;
    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    /**
     * Word Count APP
     */
    public class WCApp {
        public static void main(String[] args) throws Exception {
            // 初始化配置文件
            Configuration conf = new Configuration();
    
            // 仅在本地开发时使用
    //        conf.set("fs.defaultFS", "file:///");
    
            // 通过配置文件初始化 job
            Job job = Job.getInstance(conf);
    
            // 设置 job 名称
            job.setJobName("Word Count");
    
            // job 入口函数类
            job.setJarByClass(WCApp.class);
    
            // 设置 mapper 类
            job.setMapperClass(WCMapper.class);
    
            // 设置 reducer 类
            job.setReducerClass(WCReducer.class);
    
            // 设置 map 的输出 K-V 类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            // 设置 reduce 的输出 K-V 类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            // 设置输入路径和输出路径
    //        Path pin = new Path("E:/test/wc/1.txt");
    //        Path pout = new Path("E:/test/wc/out");
            Path pin = new Path(args[0]);
            Path pout = new Path(args[1]);
            FileInputFormat.addInputPath(job, pin);
            FileOutputFormat.setOutputPath(job, pout);
    
            // 执行 job
            job.waitForCompletion(true);
        }
    }

    3. 打包项目 & 测试

      使用 Maven 打包程序如下图所示:

      

      

      将打包好的 jar 包和测试文件通过 Xftp 上传到服务器

      将测试文件上传到 HDFS ,命令略

      开启 Hadoop 集群,命令略

      执行以下命令

    hadoop jar myhadoop-1.0-SNAPSHOT.jar hadoop.mr.wc.WCApp /testdata/1.txt /testdata/out

      命令行下可以看到结果,Web UI 查看 http://s101:8088

      


     4. 总结

      Mapreduce 作业放在集群上运行分为以下步骤:

        1. 编写测试代码,测试其单机模式的运行
        2. 改造代码,使其能运行在集群上,改变参数的设置方式
        3. 打成 jar 包,发送到服务器中
        4. 在服务器中进行测试


  • 相关阅读:
    在C语言中,double、long、unsigned、int、char类型数据所占字节数
    SIFT算法的应用--目标识别之Bag-of-words模型
    公司笔试客观题
    程序的内存分配 CC++
    C++编程练习(14)-------“单例模式”的实现
    SSH框架:同一个工程之前可以正常运行,现在不能
    严重: Exception starting filter struts2 Unable to load configuration.
    Oracle SQL Developer出现错误 【ora-28002:the password will expire within 7 days】的解决办法
    jQuery 属性操作
    前端模块化开发应用——日历组件开发
  • 原文地址:https://www.cnblogs.com/share23/p/9905068.html
Copyright © 2011-2022 走看看