zoukankan      html  css  js  c++  java
  • mapreduce学习笔记一:WordCound

    MapReduce采用的是“分而治之”的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个从节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。简单来说,MapReduce就是”任务的分解与结果的汇总“。

    1.MapReduce的工作原理

    在分布式计算中,MapReduce框架负责处理了并行编程里分布式存储、工作调度,负载均衡、容错处理以及网络通信等复杂问题,现在我们把处理过程高度抽象为Map与Reduce两个部分来进行阐述,其中Map部分负责把任务分解成多个子任务,Reduce部分负责把分解后多个子任务的处理结果汇总起来,具体设计思路如下。

    (1)Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法。通过在map方法中添加两句把key值和value值输出到控制台的代码,可以发现map方法中输入的value值存储的是文本文件中的一行(以回车符为行结束标记),而输入的key值存储的是该行的首字母相对于文本文件的首地址的偏移量。然后用StringTokenizer类将每一行拆分成为一个个的字段,把截取出需要的字段(本实验为买家id字段)设置为key,并将其作为map方法的结果输出。

    (2)Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。Map过程输出的<key,value>键值对先经过shuffle过程把key值相同的所有value值聚集起来形成values,此时values是对应key字段的计数值所组成的列表,然后将<key,values>输入到reduce方法中,reduce方法只要遍历values并求和,即可得到某个单词的总次数。

    在main()主函数中新建一个Job对象,由Job对象负责管理和运行MapReduce的一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。本实验是设置使用将继承Mapper的doMapper类完成Map过程中的处理和使用doReducer类完成Reduce过程中的处理。还设置了Map过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。任务的输出和输入路径则由字符串指定,并由FileInputFormat和FileOutputFormat分别设定。完成相应任务的参数设定后,即可调用job.waitForCompletion()方法执行任务,其余的工作都交由MapReduce框架处理。

    2.java api的文件源码:

    首先需要自己在hdfs中上传文件:(可以参开上一篇文件读写的api)

    1(这个代码是统计某一列的数量)

    package mapreduce;  
    
    import java.io.IOException;  
    
    import java.util.StringTokenizer;
    
    
    import org.apache.hadoop.conf.Configuration;
    
    import org.apache.hadoop.fs.FileSystem;
    
    import org.apache.hadoop.fs.Path;  
    
    import org.apache.hadoop.io.IntWritable;  
    
    import org.apache.hadoop.io.Text;  
    
    import org.apache.hadoop.mapreduce.Job;  
    
    import org.apache.hadoop.mapreduce.Mapper;  
    
    import org.apache.hadoop.mapreduce.Reducer;  
    
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    
    public class WordCount {  
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
            Configuration conf=new Configuration(); 
    
            conf.set("dfs.client.use.datanode.hostname", "true");
    
            @SuppressWarnings("deprecation")
    
            Job job =new Job(conf,"filter"); 
    
            job.setJobName("WordCount");  
    
            job.setJarByClass(WordCount.class);  
    
            job.setMapperClass(doMapper.class);  
    
            job.setReducerClass(doReducer.class);  
    
            job.setOutputKeyClass(Text.class);  
    
            job.setOutputValueClass(IntWritable.class);  
    
            Path in=new Path("hdfs://*自己的hdfs地址*:9000/user/hadoop/input/b.txt");  
    
            Path out=new Path("hdfs://*自己的hdfs地址*:9000/user/hadoop/output"); 
    
            Path path = new Path("hdfs://*自己的hdfs地址*:9000/user/hadoop/output");
    
            FileSystem fileSystem = path.getFileSystem(conf);// 根据path找到这个文件
    
            if (fileSystem.exists(path)) {
    
                fileSystem.delete(path, true);// true的意思是,就算output有东西,也一带删除
    
            }
    
            FileInputFormat.addInputPath(job, in);  
    
            FileOutputFormat.setOutputPath(job, out);  
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);  
    
        }  
    
        public static class doMapper extends Mapper<Object, Text, Text, IntWritable>{ 
    
            @SuppressWarnings("unused")
    
            private static Text newKey=new Text();
    
            public static final IntWritable one = new IntWritable(1);  
    
            public static Text word = new Text();  
    
            @Override  
    
            protected void map(Object key, Text value, Context context)  
    
                        throws IOException, InterruptedException { 
    
                String line=value.toString();
    
                String arr[]=line.split("   ");
    
                newKey.set(arr[0]);  
    
                context.write(newKey , one);         
    
            }  
    
        }  
    
        public static class doReducer extends Reducer<Text, IntWritable, Text, IntWritable>{  
    
            private IntWritable result = new IntWritable();  
    
            @Override  
    
            protected void reduce(Text key, Iterable<IntWritable> values, Context context)  
    
            throws IOException, InterruptedException {  
    
            int sum = 0;
    
            for (IntWritable value : values) {  
    
            sum += value.get();  
    
            }  
    
            result.set(sum); 
    
            context.write(key, result);  
    
            }  
    
        }  
    
    }  

    2.这个是所有的字符的:

    package mapreduce;  
    import java.io.IOException;  
    import java.util.StringTokenizer;  
    import org.apache.hadoop.fs.Path;  
    import org.apache.hadoop.io.IntWritable;  
    import org.apache.hadoop.io.Text;  
    import org.apache.hadoop.mapreduce.Job;  
    import org.apache.hadoop.mapreduce.Mapper;  
    import org.apache.hadoop.mapreduce.Reducer;  
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    public class WordCount {  
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
            Configuration conf=new Configuration(); 
    
            conf.set("dfs.client.use.datanode.hostname", "true");
    
            @SuppressWarnings("deprecation")
    
            Job job =new Job(conf,"filter");   
            job.setJobName("WordCount");  
            job.setJarByClass(WordCount.class);  
            job.setMapperClass(doMapper.class);  
            job.setReducerClass(doReducer.class);  
            job.setOutputKeyClass(Text.class);  
            job.setOutputValueClass(IntWritable.class);  
            Path in = new Path("hdfs://**/mymapreduce1/in/buyer_favorite1");  
            Path out = new Path("hdfs://**/mymapreduce1/out");
    Path path = new Path("hdfs://*自己的hdfs地址*:9000/user/hadoop/output"); FileSystem fileSystem = path.getFileSystem(conf);// 根据path找到这个文件 if (fileSystem.exists(path)) { fileSystem.delete(path, true);// true的意思是,就算output有东西,也一带删除 } FileInputFormat.addInputPath(job, in); FileOutputFormat.setOutputPath(job, out); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class doMapper extends Mapper<Object, Text, Text, IntWritable>{ public static final IntWritable one = new IntWritable(1); public static Text word = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString(), " "); word.set(tokenizer.nextToken()); context.write(word, one); } } public static class doReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } result.set(sum); context.write(key, result); } } }

    遇到的一些错误以及解决办法:

    (1)

    Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://*河蟹*/user/hadoop/output already exists
        at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
        at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:266)
        at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:139)
        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
        at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
        at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308)
        at mapreduce.WordCount.main(WordCount.java:36)



    错误原因:output文件夹存在,

    解决方法:

    把它删除就可以了。也可以加一个判断:

     Path path = new Path("hdfs://*自己的hdfs地址*:9000/user/hadoop/output");// 取第1个表示输出目录参数(第0个参数是输入目录)
            FileSystem fileSystem = path.getFileSystem(conf);// 根据path找到这个文件
            if (fileSystem.exists(path)) {
                fileSystem.delete(path, true);// true的意思是,就算output有东西,也一带删除
            }
    

     (2)

    java.net.ConnectException: Connection timed out: no further information
    	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
    	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
    	at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436)
    	at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:777)
    	at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:694)
    	at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355)
    	at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:656)
    	at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882)
    	at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
    	at java.io.DataInputStream.read(DataInputStream.java:149)
    	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
    	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
    	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
    	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
    	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
    	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
    	at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556)
    	at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
    	at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
    	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
    	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    	at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    	at java.lang.Thread.run(Thread.java:745)
    

     错误原因:由于操作的主机和hdfs所在的master不是一个主机,所有namenode的地址映射出现问题,而且博主配置的是伪分布式。

    解决方法:(博主配置的是伪分布式)把虚拟机的ip就加入到window本机的host文件里面(怎么改window的host参考百度)。

    然后加入conf的配置。把namenode变成地址映射。把新建job时候加入conf

    import org.apache.hadoop.conf.Configuration;
    conf.set("dfs.client.use.datanode.hostname", "true"); @SuppressWarnings("deprecation") Job job =new Job(conf,"filter")

    package mapreduce;  

    import java.io.IOException;  

    import java.util.StringTokenizer;  

    import org.apache.hadoop.fs.Path;  

    import org.apache.hadoop.io.IntWritable;  

    import org.apache.hadoop.io.Text;  

    import org.apache.hadoop.mapreduce.Job;  

    import org.apache.hadoop.mapreduce.Mapper;  

    import org.apache.hadoop.mapreduce.Reducer;  

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

    public class WordCount {  

        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  

            Job job = Job.getInstance();  

            job.setJobName("WordCount");  

            job.setJarByClass(WordCount.class);  

            job.setMapperClass(doMapper.class);  

            job.setReducerClass(doReducer.class);  

            job.setOutputKeyClass(Text.class);  

            job.setOutputValueClass(IntWritable.class);  

            Path in = new Path("hdfs://localhost:9000/mymapreduce1/in/buyer_favorite1");  

            Path out = new Path("hdfs://localhost:9000/mymapreduce1/out");  

            FileInputFormat.addInputPath(job, in);  

            FileOutputFormat.setOutputPath(job, out);  

            System.exit(job.waitForCompletion(true) ? 0 : 1);  

        }  

        public static class doMapper extends Mapper<Object, Text, Text, IntWritable>{  

            public static final IntWritable one = new IntWritable(1);  

            public static Text word = new Text();  

            @Override  

            protected void map(Object key, Text value, Context context)  

                        throws IOException, InterruptedException {  

                StringTokenizer tokenizer = new StringTokenizer(value.toString(), " ");  

                    word.set(tokenizer.nextToken());  

                    context.write(word, one);  

            }  

        }  

        public static class doReducer extends Reducer<Text, IntWritable, Text, IntWritable>{  

            private IntWritable result = new IntWritable();  

            @Override  

            protected void reduce(Text key, Iterable<IntWritable> values, Context context)  

            throws IOException, InterruptedException {  

            int sum = 0;  

            for (IntWritable value : values) {  

            sum += value.get();  

            }  

            result.set(sum);  

            context.write(key, result);  

            }  

        }  

    }  

    https://necydcy.me/
  • 相关阅读:
    开发了那么多项目,你能自己手写个健壮的链表出来吗?【华为云技术分享】
    高性能Web动画和渲染原理系列(3)——transform和opacity为什么高性能【华为云技术分享】
    pringBoot-MongoDB 索引冲突分析及解决【华为云技术分享】
    成为高手前必懂的TCP干货【华为云技术分享】
    Python爬虫从入门到精通——基本库re的使用:正则表达式【华为云技术分享】
    【我的物联网成长记2】设备如何进行选型?【华为云技术分享】
    多云架构落地设计和实施方案【华为云技术分享】
    dom的节点操作
    节点访问关系
    封装class类--分割类名后
  • 原文地址:https://www.cnblogs.com/miria-486/p/9961945.html
Copyright © 2011-2022 走看看