zoukankan      html  css  js  c++  java
  • hadoop的一些基本功能

    import org.apache.hadoop.fs.*;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    
    public class hdoopduqu extends FSDataInputStream {
    
        private static hdoopduqu myFSDataInputStream;
        private static InputStream inputStream;
    
        private hdoopduqu(InputStream in) {
            super(in);
            inputStream = in;
        }
    
        public static hdoopduqu getInstance(InputStream inputStream){
            if (null == myFSDataInputStream){
                synchronized (hdoopduqu.class){
                    if (null == myFSDataInputStream){
                        myFSDataInputStream = new hdoopduqu(inputStream);
                    }
                }
            }
            return myFSDataInputStream;
        }
    
        public static String readline(FileSystem fileStatus){
            try {
    //            FSDataInputStream inputStream = fileStatus.open(remotePath);
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
                String line = null;
                if ((line = bufferedReader.readLine()) != null){
                    bufferedReader.close();
                    inputStream.close();
                    return line;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            return null;
        }
    
    }
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class ifexit {
        public static void main(String[] args) {
            try {
                String fileName="tmp";
                Configuration conf=new Configuration();
                conf.set("fs.defaultFS", "hdfs://192.168.198.130:8020");
                conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
                FileSystem fs=FileSystem.get(conf);
                if(fs.exists(new Path(fileName))) {
                    System.out.println("File exists");
                }else {
                    System.out.println("File doesnot exist");
                }
                
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }
    import java.io.IOException;
    import java.util.Iterator;
    import java.util.StringTokenizer;
    import org.apache.hadoop.*;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    public class WordCount {
        public WordCount() {
        }
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
            if(otherArgs.length < 2) {
                System.err.println("Usage: wordcount <in> [<in>...] <out>");
                System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(WordCount.TokenizerMapper.class);
        job.setCombinerClass(WordCount.IntSumReducer.class);
        job.setReducerClass(WordCount.IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        for(int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public TokenizerMapper() {
        }
        public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while(itr.hasMoreTokens()) {
                this.word.set(itr.nextToken());
                context.write(this.word, one);
            }
        }
    }
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        public IntSumReducer() {
        }
        public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            int sum = 0;
            IntWritable val;
            for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
                val = (IntWritable)i$.next();
            }
            this.result.set(sum);
            context.write(key, this.result);
            }
        }
    }

    以上是实现读取hdfs数据,实现文件存在查询,以及对hdfs的一个文件进行字母数数的功能

     
  • 相关阅读:
    理解C#中的 async await
    kube-proxy IPVS 模式的工作原理
    Kilo 使用教程
    Wireguard 全互联模式(full mesh)配置指南
    我为什么不鼓吹 WireGuard
    iTerm2 实现 ssh 自动登录,并使用 Zmodem 实现快速传输文件
    在 Docker Desktop 中启用 K8s 服务
    ABP 适用性改造
    ABP 适用性改造
    在 ASP.NET Core 应用中使用 Cookie 进行身份认证
  • 原文地址:https://www.cnblogs.com/520520520zl/p/14199131.html
Copyright © 2011-2022 走看看