zoukankan      html  css  js  c++  java
  • hadoop_wordcount_1023

    package s26;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;
    import java.util.StringTokenizer;

    public class MyMap extends Mapper<LongWritable,Text,Text,IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    //super.map(key, value, context);

    StringTokenizer st = new StringTokenizer(value.toString());
    Text result = new Text();
    IntWritable one = new IntWritable(1);

    while (st.hasMoreTokens()){
    String word = st.nextToken();
    result.set(word);
    context.write(result,one);
    }
    }
    }




    package s26;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException;

    public class MyRed extends Reducer<Text,IntWritable,Text,IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    //super.reduce(key, values, context);
    int sum = 0;
    for(IntWritable i:values){
    sum += i.get();
    }
    context.write(key,new IntWritable(sum));

    }
    }






    -----------
    package s26;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

    import java.io.IOException;

    public class MyJob {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

    /*指定InputPath、outputPath*/
    args=new String[]{"hdfs://master:9000/root/experiment/datas","hdfs://master:9000/root/experiment/output"};
    if(args.length!=2){
    System.err.println("Usage:please put <in> <out>");
    System.exit(2);
    }
    /*创建Configuration对象*/
    Configuration conf = new Configuration();
    /*设置fs.defaultFS属性值*/
    conf.set("fs.defaultFS", "hdfs://master:9000");
    /*创建Job对象并进行初始化*/
    Job job = Job.getInstance(conf);
    /*指定Jar的来源类*/
    job.setJarByClass(MyJob.class);
    /*指定作业名称*/
    job.setJobName("MyJob");
    /*设置作业输出数据的键类*/
    job.setOutputKeyClass(Text.class);
    /*设置作业输出的值类*/
    job.setOutputValueClass(IntWritable.class);
    /*设置作业的Mapper类*/
    job.setMapperClass(MyMap.class);
    /*设置作业的Reducer类*/
    job.setReducerClass(MyRed.class);
    /*设置作业的输入格式类*/
    job.setInputFormatClass(TextInputFormat.class);
    /*设置作业的输出格式类*/
    job.setOutputFormatClass(TextOutputFormat.class);
    /*设置输出地址*/
    Path output=new Path(args[1]);
    /*获得文件系统*/
    FileSystem fs=FileSystem.get(conf);
    /*路径存在则删除该路径(文件夹)*/
    if(fs.exists(output)){
    fs.delete(output, true);
    }
    /*指定InputPath、outputPath*/
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    /*将作业提交给集群并等待它完成*/
    job.waitForCompletion(true);
    }
    }



    小石小石摩西摩西的学习笔记,欢迎提问,欢迎指正!!!
  • 相关阅读:
    python 各层级目录下的import方法
    Ubuntu更新python3.5到python3.7
    程序员的江湖--个人品牌
    产品经理看哪吒之魔童降世
    Python 相对路径和绝对路径--python实战(九)
    vim 下修改tab键为四个空格
    一个python问题引发的思考
    【python】多进程共享变量
    【python】spark+kafka使用
    【python】kafka在与celery和gevent连用时遇到的问题
  • 原文地址:https://www.cnblogs.com/shijingwen/p/13864342.html
Copyright © 2011-2022 走看看