zoukankan      html  css  js  c++  java
  • mapreduce学习笔记一:WordCound

    MapReduce采用的是“分而治之”的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个从节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。简单来说,MapReduce就是”任务的分解与结果的汇总“。

    1.MapReduce的工作原理

    在分布式计算中,MapReduce框架负责处理了并行编程里分布式存储、工作调度,负载均衡、容错处理以及网络通信等复杂问题,现在我们把处理过程高度抽象为Map与Reduce两个部分来进行阐述,其中Map部分负责把任务分解成多个子任务,Reduce部分负责把分解后多个子任务的处理结果汇总起来,具体设计思路如下。

    (1)Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法。通过在map方法中添加两句把key值和value值输出到控制台的代码,可以发现map方法中输入的value值存储的是文本文件中的一行(以回车符为行结束标记),而输入的key值存储的是该行的首字母相对于文本文件的首地址的偏移量。然后用StringTokenizer类将每一行拆分成为一个个的字段,把截取出需要的字段(本实验为买家id字段)设置为key,并将其作为map方法的结果输出。

    (2)Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。Map过程输出的<key,value>键值对先经过shuffle过程把key值相同的所有value值聚集起来形成values,此时values是对应key字段的计数值所组成的列表,然后将<key,values>输入到reduce方法中,reduce方法只要遍历values并求和,即可得到某个单词的总次数。

    在main()主函数中新建一个Job对象,由Job对象负责管理和运行MapReduce的一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。本实验是设置使用将继承Mapper的doMapper类完成Map过程中的处理和使用doReducer类完成Reduce过程中的处理。还设置了Map过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。任务的输出和输入路径则由字符串指定,并由FileInputFormat和FileOutputFormat分别设定。完成相应任务的参数设定后,即可调用job.waitForCompletion()方法执行任务,其余的工作都交由MapReduce框架处理。

    2.java api的文件源码:

    首先需要自己在hdfs中上传文件:(可以参开上一篇文件读写的api)

    1(这个代码是统计某一列的数量)

    package mapreduce;  
    
    import java.io.IOException;  
    
    import java.util.StringTokenizer;
    
    
    import org.apache.hadoop.conf.Configuration;
    
    import org.apache.hadoop.fs.FileSystem;
    
    import org.apache.hadoop.fs.Path;  
    
    import org.apache.hadoop.io.IntWritable;  
    
    import org.apache.hadoop.io.Text;  
    
    import org.apache.hadoop.mapreduce.Job;  
    
    import org.apache.hadoop.mapreduce.Mapper;  
    
    import org.apache.hadoop.mapreduce.Reducer;  
    
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    
    public class WordCount {  
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
            Configuration conf=new Configuration(); 
    
            conf.set("dfs.client.use.datanode.hostname", "true");
    
            @SuppressWarnings("deprecation")
    
            Job job =new Job(conf,"filter"); 
    
            job.setJobName("WordCount");  
    
            job.setJarByClass(WordCount.class);  
    
            job.setMapperClass(doMapper.class);  
    
            job.setReducerClass(doReducer.class);  
    
            job.setOutputKeyClass(Text.class);  
    
            job.setOutputValueClass(IntWritable.class);  
    
            Path in=new Path("hdfs://*自己的hdfs地址*:9000/user/hadoop/input/b.txt");  
    
            Path out=new Path("hdfs://*自己的hdfs地址*:9000/user/hadoop/output"); 
    
            Path path = new Path("hdfs://*自己的hdfs地址*:9000/user/hadoop/output");
    
            FileSystem fileSystem = path.getFileSystem(conf);// 根据path找到这个文件
    
            if (fileSystem.exists(path)) {
    
                fileSystem.delete(path, true);// true的意思是,就算output有东西,也一带删除
    
            }
    
            FileInputFormat.addInputPath(job, in);  
    
            FileOutputFormat.setOutputPath(job, out);  
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);  
    
        }  
    
        public static class doMapper extends Mapper<Object, Text, Text, IntWritable>{ 
    
            @SuppressWarnings("unused")
    
            private static Text newKey=new Text();
    
            public static final IntWritable one = new IntWritable(1);  
    
            public static Text word = new Text();  
    
            @Override  
    
            protected void map(Object key, Text value, Context context)  
    
                        throws IOException, InterruptedException { 
    
                String line=value.toString();
    
                String arr[]=line.split("   ");
    
                newKey.set(arr[0]);  
    
                context.write(newKey , one);         
    
            }  
    
        }  
    
        public static class doReducer extends Reducer<Text, IntWritable, Text, IntWritable>{  
    
            private IntWritable result = new IntWritable();  
    
            @Override  
    
            protected void reduce(Text key, Iterable<IntWritable> values, Context context)  
    
            throws IOException, InterruptedException {  
    
            int sum = 0;
    
            for (IntWritable value : values) {  
    
            sum += value.get();  
    
            }  
    
            result.set(sum); 
    
            context.write(key, result);  
    
            }  
    
        }  
    
    }  

    2.这个是所有的字符的:

    package mapreduce;  
    import java.io.IOException;  
    import java.util.StringTokenizer;  
    import org.apache.hadoop.fs.Path;  
    import org.apache.hadoop.io.IntWritable;  
    import org.apache.hadoop.io.Text;  
    import org.apache.hadoop.mapreduce.Job;  
    import org.apache.hadoop.mapreduce.Mapper;  
    import org.apache.hadoop.mapreduce.Reducer;  
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    public class WordCount {  
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
            Configuration conf=new Configuration(); 
    
            conf.set("dfs.client.use.datanode.hostname", "true");
    
            @SuppressWarnings("deprecation")
    
            Job job =new Job(conf,"filter");   
            job.setJobName("WordCount");  
            job.setJarByClass(WordCount.class);  
            job.setMapperClass(doMapper.class);  
            job.setReducerClass(doReducer.class);  
            job.setOutputKeyClass(Text.class);  
            job.setOutputValueClass(IntWritable.class);  
            Path in = new Path("hdfs://**/mymapreduce1/in/buyer_favorite1");  
            Path out = new Path("hdfs://**/mymapreduce1/out");
    Path path = new Path("hdfs://*自己的hdfs地址*:9000/user/hadoop/output"); FileSystem fileSystem = path.getFileSystem(conf);// 根据path找到这个文件 if (fileSystem.exists(path)) { fileSystem.delete(path, true);// true的意思是,就算output有东西,也一带删除 } FileInputFormat.addInputPath(job, in); FileOutputFormat.setOutputPath(job, out); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class doMapper extends Mapper<Object, Text, Text, IntWritable>{ public static final IntWritable one = new IntWritable(1); public static Text word = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString(), " "); word.set(tokenizer.nextToken()); context.write(word, one); } } public static class doReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } result.set(sum); context.write(key, result); } } }

    遇到的一些错误以及解决办法:

    (1)

    Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://*河蟹*/user/hadoop/output already exists
        at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
        at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:266)
        at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:139)
        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
        at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
        at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308)
        at mapreduce.WordCount.main(WordCount.java:36)



    错误原因:output文件夹存在,

    解决方法:

    把它删除就可以了。也可以加一个判断:

     Path path = new Path("hdfs://*自己的hdfs地址*:9000/user/hadoop/output");// 取第1个表示输出目录参数(第0个参数是输入目录)
            FileSystem fileSystem = path.getFileSystem(conf);// 根据path找到这个文件
            if (fileSystem.exists(path)) {
                fileSystem.delete(path, true);// true的意思是,就算output有东西,也一带删除
            }
    

     (2)

    java.net.ConnectException: Connection timed out: no further information
    	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
    	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
    	at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436)
    	at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:777)
    	at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:694)
    	at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355)
    	at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:656)
    	at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882)
    	at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
    	at java.io.DataInputStream.read(DataInputStream.java:149)
    	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
    	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
    	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
    	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
    	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
    	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
    	at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556)
    	at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
    	at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
    	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
    	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    	at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    	at java.lang.Thread.run(Thread.java:745)
    

     错误原因:由于操作的主机和hdfs所在的master不是一个主机,所有namenode的地址映射出现问题,而且博主配置的是伪分布式。

    解决方法:(博主配置的是伪分布式)把虚拟机的ip就加入到window本机的host文件里面(怎么改window的host参考百度)。

    然后加入conf的配置。把namenode变成地址映射。把新建job时候加入conf

    import org.apache.hadoop.conf.Configuration;
    conf.set("dfs.client.use.datanode.hostname", "true"); @SuppressWarnings("deprecation") Job job =new Job(conf,"filter")

    package mapreduce;  

    import java.io.IOException;  

    import java.util.StringTokenizer;  

    import org.apache.hadoop.fs.Path;  

    import org.apache.hadoop.io.IntWritable;  

    import org.apache.hadoop.io.Text;  

    import org.apache.hadoop.mapreduce.Job;  

    import org.apache.hadoop.mapreduce.Mapper;  

    import org.apache.hadoop.mapreduce.Reducer;  

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

    public class WordCount {  

        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  

            Job job = Job.getInstance();  

            job.setJobName("WordCount");  

            job.setJarByClass(WordCount.class);  

            job.setMapperClass(doMapper.class);  

            job.setReducerClass(doReducer.class);  

            job.setOutputKeyClass(Text.class);  

            job.setOutputValueClass(IntWritable.class);  

            Path in = new Path("hdfs://localhost:9000/mymapreduce1/in/buyer_favorite1");  

            Path out = new Path("hdfs://localhost:9000/mymapreduce1/out");  

            FileInputFormat.addInputPath(job, in);  

            FileOutputFormat.setOutputPath(job, out);  

            System.exit(job.waitForCompletion(true) ? 0 : 1);  

        }  

        public static class doMapper extends Mapper<Object, Text, Text, IntWritable>{  

            public static final IntWritable one = new IntWritable(1);  

            public static Text word = new Text();  

            @Override  

            protected void map(Object key, Text value, Context context)  

                        throws IOException, InterruptedException {  

                StringTokenizer tokenizer = new StringTokenizer(value.toString(), " ");  

                    word.set(tokenizer.nextToken());  

                    context.write(word, one);  

            }  

        }  

        public static class doReducer extends Reducer<Text, IntWritable, Text, IntWritable>{  

            private IntWritable result = new IntWritable();  

            @Override  

            protected void reduce(Text key, Iterable<IntWritable> values, Context context)  

            throws IOException, InterruptedException {  

            int sum = 0;  

            for (IntWritable value : values) {  

            sum += value.get();  

            }  

            result.set(sum);  

            context.write(key, result);  

            }  

        }  

    }  

    https://necydcy.me/
  • 相关阅读:
    permission 文档 翻译 运行时权限
    TabLayout ViewPager Fragment 简介 案例 MD
    Log 日志工具类 保存到文件 MD
    OkHttp 官方wiki 翻译 MD
    Okhttp 简介 示例 MD
    OkHttp 官方Wiki之【使用案例】
    DialogPlus
    倒计时 总结 Timer Handler CountDownTimer RxJava MD
    RecyclerView 判断滑到底部 顶部 预加载 更多 分页 MD
    CSS3的媒体查询(Media Queries)与移动设备显示尺寸大全
  • 原文地址:https://www.cnblogs.com/miria-486/p/9961945.html
Copyright © 2011-2022 走看看