zoukankan      html  css  js  c++  java
  • Hadoop 代码实现文件上传

    本项目主要实现Windows下利用代码实现Hadoop中文件上传至HDFS

    实现上传文本文件中单词个数的计数

    1、项目结构

     

    2、相关代码

    • CopyFromLocalFile
      1 package com.hadoop.worldcount;
      2 
      3 import java.io.FileInputStream;
      4 
      5 import java.io.IOException;
      6 
      7 import org.apache.hadoop.conf.Configuration;
      8 
      9 import org.apache.hadoop.fs.FSDataOutputStream;
     10 
     11 import org.apache.hadoop.fs.FileSystem;
     12 
     13 import org.apache.hadoop.fs.Path;
     14 
     15 public class CopyFromLocalFile {
     16 
     17 /**
     18 
     19     * 判断路径是否存在
     20 
     21  */
     22 
     23 public static boolean test(Configuration conf, String path) {
     24 
     25        try (FileSystem fs = FileSystem.get(conf)) {
     26 
     27            return fs.exists(new Path(path));
     28 
     29        } catch (IOException e) {
     30 
     31  
     32 
     33            e.printStackTrace();
     34 
     35            return false;
     36 
     37        }
     38 
     39  
     40 
     41    }
     42 
     43    /**
     44 
     45  
     46 
     47     * 复制文件到指定路径 若路径已存在,则进行覆盖
     48 
     49  
     50 
     51     */
     52 
     53  
     54 
     55    public static void copyFromLocalFile(Configuration conf,
     56 
     57            String localFilePath, String remoteFilePath) {
     58 
     59        Path localPath = new Path(localFilePath);
     60 
     61        Path remotePath = new Path(remoteFilePath);
     62 
     63        try (FileSystem fs = FileSystem.get(conf)) {
     64 
     65            /* fs.copyFromLocalFile 第一个参数表示是否删除源文件,第二个参数表示是否覆盖 */
     66 
     67            fs.copyFromLocalFile(false, true, localPath, remotePath);
     68 
     69        } catch (IOException e) {
     70 
     71            e.printStackTrace();
     72 
     73        }
     74 
     75    }
     76 
     77  
     78 
     79    /**
     80 
     81  
     82 
     83     * 追加文件内容
     84 
     85  
     86 
     87     */
     88 
     89  
     90 
     91    public static void appendToFile(Configuration conf, String localFilePath,
     92 
     93            String remoteFilePath) {
     94 
     95        Path remotePath = new Path(remoteFilePath);
     96 
     97        try (FileSystem fs = FileSystem.get(conf);
     98 
     99                FileInputStream in = new FileInputStream(localFilePath);) {
    100 
    101            FSDataOutputStream out = fs.append(remotePath);
    102 
    103            byte[] data = new byte[1024];
    104 
    105            int read = -1;
    106 
    107            while ((read = in.read(data)) > 0) {
    108 
    109                out.write(data, 0, read);
    110 
    111            }
    112 
    113            out.close();
    114 
    115        } catch (IOException e) {
    116 
    117            e.printStackTrace();
    118 
    119        }
    120 
    121    }
    122 
    123  
    124 
    125    /**
    126 
    127  
    128 
    129     * 主函数
    130 
    131  
    132 
    133     */
    134 
    135  
    136 
    137    public static void main(String[] args) {
    138 
    139        Configuration conf = new Configuration();
    140 
    141        conf.set("fs.defaultFS", "hdfs://localhost:9000");
    142 
    143        String localFilePath = "/usr/hadoop/test/test.txt"; // 本地路径
    144 
    145        String remoteFilePath = "/user/hadoop/test/test.txt"; // HDFS路径
    146 
    147        String choice = "append"; // 若文件存在则追加到文件末尾
    148 
    149        //String choice = "overwrite"; // 若文件存在则覆盖
    150 
    151        try {
    152 
    153            /* 判断文件是否存在 */
    154 
    155            boolean fileExists = false;
    156 
    157            if (CopyFromLocalFile.test(conf, remoteFilePath)) {
    158 
    159                fileExists = true;
    160 
    161                System.out.println(remoteFilePath + " 已存在.");
    162 
    163            } else {
    164 
    165                System.out.println(remoteFilePath + " 不存在.");
    166 
    167            }
    168 
    169            /* 进行处理 */
    170 
    171            if (!fileExists) { // 文件不存在,则上传
    172 
    173                CopyFromLocalFile.copyFromLocalFile(conf, localFilePath,
    174 
    175                        remoteFilePath);
    176 
    177                System.out.println(localFilePath + " 已上传至 " + remoteFilePath);
    178 
    179            } else if (choice.equals("overwrite")) { // 选择覆盖      
    180 
    181                CopyFromLocalFile.copyFromLocalFile(conf, localFilePath,
    182 
    183                        remoteFilePath);
    184 
    185                System.out.println(localFilePath + " 已覆盖 " + remoteFilePath);
    186 
    187            } else if (choice.equals("append")) { // 选择追加                
    188 
    189                CopyFromLocalFile.appendToFile(conf, localFilePath,
    190 
    191                        remoteFilePath);
    192 
    193                System.out.println(localFilePath + " 已追加至 " + remoteFilePath);
    194 
    195            }
    196 
    197        } catch (Exception e) {
    198 
    199    
    200 
    201            e.printStackTrace();
    202 
    203        }
    204 
    205    }
    206 
    207 }
    • File
    package com.hadoop.worldcount;
    
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.util.Progressable;
    
    public class File {
    
    /**
      * @param args
      * @throws IOException 
      */
    
    public static void main(String[] args) throws Exception {
            String localSrc = "E:\Hadoop\work\bashrc.txt";//本地文件
            String dst = "hdfs://localhost:9000/user/hadoop/test/bashrc.txt";//复制到hdfs目录下
            InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(URI.create(dst), conf);
            OutputStream out = fs.create(new Path(dst), new Progressable() {//进度条信息
                public void progress() {
                    System.out.print(".");
                }
            });
            IOUtils.copyBytes(in, out, 4096, true);//复制
        }
    }
    • MyWordCount
     1 package com.hadoop.worldcount;
     2 
     3 import java.io.IOException;
     4 import java.util.StringTokenizer;
     5 import org.apache.hadoop.conf.Configuration;
     6 import org.apache.hadoop.fs.Path;
     7 import org.apache.hadoop.io.IntWritable;
     8 import org.apache.hadoop.io.LongWritable;
     9 import org.apache.hadoop.io.Text;
    10 import org.apache.hadoop.mapreduce.Job;
    11 import org.apache.hadoop.mapreduce.Mapper;
    12 import org.apache.hadoop.mapreduce.Reducer;
    13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    15  
    16 public class MyWordCount {
    17     
    18     public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    19         /**
    20          * Mapper中的map方法:
    21          * void map(K1 key, V1 value, Context context)
    22          * 映射一个单个的输入k/v对到一个中间的k/v对
    23          * 输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。
    24          * Context:收集Mapper输出的<k,v>对。
    25          * Context的write(k, v)方法:增加一个(k,v)对到context
    26          * 程序员主要编写Map和Reduce函数.这个Map函数使用StringTokenizer函数对字符串进行分隔,通过write方法把单词存入word中
    27          * write方法存入(单词,1)这样的二元组到context中
    28         */  
    29         @Override
    30         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    31                 throws IOException, InterruptedException {
    32             StringTokenizer itr = new StringTokenizer(value.toString());
    33             while (itr.hasMoreTokens()) {
    34                 context.write(new Text(itr.nextToken()), new IntWritable(1));
    35             }
    36         }
    37     }
    38     
    39     public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    40         /**
    41          * Reducer类中的reduce方法:
    42          * void reduce(Text key, Iterable<IntWritable> values, Context context)
    43          * 中k/v来自于map函数中的context,可能经过了进一步处理(combiner),同样通过context输出           
    44          */
    45         @Override
    46         protected void reduce(Text key, Iterable<IntWritable> values,
    47                 Context context) throws IOException, InterruptedException {
    48             int sum = 0;
    49             for (IntWritable val : values) {
    50                 sum += val.get();
    51             }
    52             context.write(key, new IntWritable(sum));
    53         }
    54     }
    55     
    56     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    57          /**
    58          * Configuration:map/reduce的j配置类,向hadoop框架描述map-reduce执行的工作
    59          */
    60         Configuration conf = new Configuration();
    61         Job job = Job.getInstance(conf, "myWordCount"); //设置一个用户定义的job名称
    62         job.setJarByClass(MyWordCount.class);
    63         job.setMapperClass(TokenizerMapper.class); //为job设置Mapper类
    64         job.setCombinerClass(IntSumReducer.class);    //为job设置Combiner类
    65         job.setReducerClass(IntSumReducer.class); //为job设置Reducer类
    66         job.setOutputKeyClass(Text.class);        //为job的输出数据设置Key类
    67         job.setOutputValueClass(IntWritable.class);    //为job输出设置value类
    68              
    69         FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/user/root/input/bashrc.txt"));
    70         FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/root/output"));
    71  
    72         System.exit(job.waitForCompletion(true) ?0 : 1);        //运行job
    73     }
    74  
    75 }
  • 相关阅读:
    hive之窗口函数
    linux环境下Mysql的卸载和重新安装和启动
    2017ACM暑期多校联合训练
    2017ACM暑期多校联合训练
    状态压缩dp
    铺砖问题 (状态压缩dp)
    POj 2104 K-th Number (分桶法+线段树)
    POJ 2991 Crane (线段树)
    4 Values whose Sum is 0 POJ 2785 (折半枚举)
    Billboard HDU 2795 (线段树)
  • 原文地址:https://www.cnblogs.com/lx06/p/15342476.html
Copyright © 2011-2022 走看看