zoukankan      html  css  js  c++  java
  • 13-hadoop-入门程序

    通过之前的操作, 

    http://www.cnblogs.com/wenbronk/p/6636926.html

    http://www.cnblogs.com/wenbronk/p/6659481.html

    hadoop-HA的集群已经搭建完成了, 需要写个小程序来认识下hadoop了

    统计文本文件中, 每个单词出现的次数

    1, Eclipse下新建Java-project

    2, 新建lib文件, 导入jar包, 并buildpath

    hadoop-2.5.1sharehadoopcommon  所有jar,
    hadoop-2.5.1sharehadoopcommonlib  所有jar,
    
    hadoop-2.5.1sharehadoophdfs  所有jar
    hadoop-2.5.1sharehadoopmapreduce  所有jar
    hadoop-2.5.1sharehadoopyarn  所有jar

    3, Mapper类: WordCountMapper.java

    package com.wenbronk.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    
    /**
     * 测试mapreduce, 计算单词出现的次数
     * @author wenbronk
     * KEYIN: split的键, 行坐在的下标
     * VALUEIN: split的值, 行值
     * KEYOUT: 需求, 输出给reduce
     * VALUEOUT: 需求, 输出给reduce
     */
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
        /**
         * 重写map方法, 循环调用
         * 从split中读取一行调用一次, 以行所在下标为key, 行内容为value
         */
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            
            // text 转string, toString(), 使用空格分隔为单词数组
            String[] words = StringUtils.split(value.toString(), ' ');
            for (String word : words) {
                // 键值对输出, 输出给reduce
                context.write(new Text(word), new IntWritable(1));
            }
            
        }
        
    }

    4, Reduce类, WordCountReduce.java

    package com.wenbronk.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    /**
     * shuffling 后传给 reduce
     * @author wenbronk
     * KEYIN: mapper的输出
     * VALUEIN: mapper的输出
     */
    public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
    
        /**
         * 循环调用
         * 每组调用一次, key相同, value可能多个, 使用迭代器
         */
        @Override
        protected void reduce(Text arg0, Iterable<IntWritable> arg1,
                Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            // 对值进行累加
            int sum = 0;
            // 使用迭代器
            for (IntWritable value : arg1) {
                sum += value.get();
            }
            // 使用context输出
            context.write(arg0 , new IntWritable(sum));
        }
        
    }

    5, 然后是具体的执行类: RunMapReduce.java

    package com.wenbronk.mapreduce;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * 执行mapreduce
     * 统计单词出新的次数
     * @author wenbr
     *
     */
    public class RunMapReduce {
    
        public static void main(String[] args) throws Exception {
            // 初始化时加载src或classpath下所有的hadoop配置文件
            Configuration configuration = new Configuration();
            
            // 得到执行的任务
            Job job = Job.getInstance(config);
            
            // 入口类
            job.setJarByClass(RunMapReduce.class);
            
            // job名字
            job.setJobName("wordCount");
            
            // job执行是map执行的类
            job.setMapperClass(WordCountMapper.class);
            
            // job执行的reduce类
            job.setReducerClass(WordCountReduce.class);
            
            // job输出的键类型
            job.setMapOutputKeyClass(Text.class);
            
            // job输出的value类型
            job.setMapOutputValueClass(IntWritable.class);
            
            //**** 使用插件上传data.txt到hdfs/root/usr/data.txt
            
            // 使用文件
            FileInputFormat.addInputPath(job, new Path("/root/usr/"));
            
            // 使用一个不存在的目录进行
            Path path = new Path("/root/usr/output");
            // 如果存在删除
            FileSystem fs = FileSystem.get(configuration);
            if (fs.exists(path)) {
                fs.delete(path, true);
            }
            
            // 输出
            FileOutputFormat.setOutputPath(job, path);
            
            boolean forCompletion = job.waitForCompletion(true);
            
            if (forCompletion) {
                System.out.println("success");
            }
        }
        
    }

    所有的类编写好了, 接下来是上传文件

    6, 使用eclipse插件上传data.txt到hadoop目录 /usr/data.txt

    我是用的插件为: 

    7, 运行

    这儿使用直接发布到服务器运行的方式

    eclipse打包项目成jar包(只需要源码即可), 然后上传到服务器目录下, 使用hadoop命令执行
    格式: hadoop jar jar路径 类全限定名

    hadoop jar wc.jar com.wenbronk.mapreduce.RunMapReduce

     之后在hadoop的目录下就可以看到统计后输出的文件了

  • 相关阅读:
    258 第七篇:Django-组件-ContentType组件
    257 第七篇:Django-组件-Auth模块
    256 第七篇:Django-组件-中间件组件
    255 第七篇:Django-组件-cookie与session组件
    254 第七篇:Django-组件-forms组件
    253 第七篇:Django-组件-分页器组件
    第二节:2_委托入门笔记
    EF-调用sql进行操作
    1.1常见的错误---登录(菜鸟常遇错误)
    EF-Lamdba
  • 原文地址:https://www.cnblogs.com/wenbronk/p/6662119.html
Copyright © 2011-2022 走看看