本项目主要实现Windows下利用代码实现Hadoop中文件上传至HDFS
实现上传文本文件中单词个数的计数
1、项目结构
2、相关代码
-
CopyFromLocalFile
1 package com.hadoop.worldcount; 2 3 import java.io.FileInputStream; 4 5 import java.io.IOException; 6 7 import org.apache.hadoop.conf.Configuration; 8 9 import org.apache.hadoop.fs.FSDataOutputStream; 10 11 import org.apache.hadoop.fs.FileSystem; 12 13 import org.apache.hadoop.fs.Path; 14 15 public class CopyFromLocalFile { 16 17 /** 18 19 * 判断路径是否存在 20 21 */ 22 23 public static boolean test(Configuration conf, String path) { 24 25 try (FileSystem fs = FileSystem.get(conf)) { 26 27 return fs.exists(new Path(path)); 28 29 } catch (IOException e) { 30 31 32 33 e.printStackTrace(); 34 35 return false; 36 37 } 38 39 40 41 } 42 43 /** 44 45 46 47 * 复制文件到指定路径 若路径已存在,则进行覆盖 48 49 50 51 */ 52 53 54 55 public static void copyFromLocalFile(Configuration conf, 56 57 String localFilePath, String remoteFilePath) { 58 59 Path localPath = new Path(localFilePath); 60 61 Path remotePath = new Path(remoteFilePath); 62 63 try (FileSystem fs = FileSystem.get(conf)) { 64 65 /* fs.copyFromLocalFile 第一个参数表示是否删除源文件,第二个参数表示是否覆盖 */ 66 67 fs.copyFromLocalFile(false, true, localPath, remotePath); 68 69 } catch (IOException e) { 70 71 e.printStackTrace(); 72 73 } 74 75 } 76 77 78 79 /** 80 81 82 83 * 追加文件内容 84 85 86 87 */ 88 89 90 91 public static void appendToFile(Configuration conf, String localFilePath, 92 93 String remoteFilePath) { 94 95 Path remotePath = new Path(remoteFilePath); 96 97 try (FileSystem fs = FileSystem.get(conf); 98 99 FileInputStream in = new FileInputStream(localFilePath);) { 100 101 FSDataOutputStream out = fs.append(remotePath); 102 103 byte[] data = new byte[1024]; 104 105 int read = -1; 106 107 while ((read = in.read(data)) > 0) { 108 109 out.write(data, 0, read); 110 111 } 112 113 out.close(); 114 115 } catch (IOException e) { 116 117 e.printStackTrace(); 118 119 } 120 121 } 122 123 124 125 /** 126 127 128 129 * 主函数 130 131 132 133 */ 134 135 136 137 public static void main(String[] args) { 138 139 Configuration conf = new Configuration(); 140 141 conf.set("fs.defaultFS", "hdfs://localhost:9000"); 142 143 String localFilePath = "/usr/hadoop/test/test.txt"; // 本地路径 144 145 String remoteFilePath = "/user/hadoop/test/test.txt"; // HDFS路径 146 147 String choice = "append"; // 若文件存在则追加到文件末尾 148 149 //String choice = "overwrite"; // 若文件存在则覆盖 150 151 try { 152 153 /* 判断文件是否存在 */ 154 155 boolean fileExists = false; 156 157 if (CopyFromLocalFile.test(conf, remoteFilePath)) { 158 159 fileExists = true; 160 161 System.out.println(remoteFilePath + " 已存在."); 162 163 } else { 164 165 System.out.println(remoteFilePath + " 不存在."); 166 167 } 168 169 /* 进行处理 */ 170 171 if (!fileExists) { // 文件不存在,则上传 172 173 CopyFromLocalFile.copyFromLocalFile(conf, localFilePath, 174 175 remoteFilePath); 176 177 System.out.println(localFilePath + " 已上传至 " + remoteFilePath); 178 179 } else if (choice.equals("overwrite")) { // 选择覆盖 180 181 CopyFromLocalFile.copyFromLocalFile(conf, localFilePath, 182 183 remoteFilePath); 184 185 System.out.println(localFilePath + " 已覆盖 " + remoteFilePath); 186 187 } else if (choice.equals("append")) { // 选择追加 188 189 CopyFromLocalFile.appendToFile(conf, localFilePath, 190 191 remoteFilePath); 192 193 System.out.println(localFilePath + " 已追加至 " + remoteFilePath); 194 195 } 196 197 } catch (Exception e) { 198 199 200 201 e.printStackTrace(); 202 203 } 204 205 } 206 207 }
- File
package com.hadoop.worldcount; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Progressable; public class File { /** * @param args * @throws IOException */ public static void main(String[] args) throws Exception { String localSrc = "E:\Hadoop\work\bashrc.txt";//本地文件 String dst = "hdfs://localhost:9000/user/hadoop/test/bashrc.txt";//复制到hdfs目录下 InputStream in = new BufferedInputStream(new FileInputStream(localSrc)); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); OutputStream out = fs.create(new Path(dst), new Progressable() {//进度条信息 public void progress() { System.out.print("."); } }); IOUtils.copyBytes(in, out, 4096, true);//复制 } }
- MyWordCount
1 package com.hadoop.worldcount; 2 3 import java.io.IOException; 4 import java.util.StringTokenizer; 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 16 public class MyWordCount { 17 18 public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> { 19 /** 20 * Mapper中的map方法: 21 * void map(K1 key, V1 value, Context context) 22 * 映射一个单个的输入k/v对到一个中间的k/v对 23 * 输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。 24 * Context:收集Mapper输出的<k,v>对。 25 * Context的write(k, v)方法:增加一个(k,v)对到context 26 * 程序员主要编写Map和Reduce函数.这个Map函数使用StringTokenizer函数对字符串进行分隔,通过write方法把单词存入word中 27 * write方法存入(单词,1)这样的二元组到context中 28 */ 29 @Override 30 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) 31 throws IOException, InterruptedException { 32 StringTokenizer itr = new StringTokenizer(value.toString()); 33 while (itr.hasMoreTokens()) { 34 context.write(new Text(itr.nextToken()), new IntWritable(1)); 35 } 36 } 37 } 38 39 public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 40 /** 41 * Reducer类中的reduce方法: 42 * void reduce(Text key, Iterable<IntWritable> values, Context context) 43 * 中k/v来自于map函数中的context,可能经过了进一步处理(combiner),同样通过context输出 44 */ 45 @Override 46 protected void reduce(Text key, Iterable<IntWritable> values, 47 Context context) throws IOException, InterruptedException { 48 int sum = 0; 49 for (IntWritable val : values) { 50 sum += val.get(); 51 } 52 context.write(key, new IntWritable(sum)); 53 } 54 } 55 56 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 57 /** 58 * Configuration:map/reduce的j配置类,向hadoop框架描述map-reduce执行的工作 59 */ 60 Configuration conf = new Configuration(); 61 Job job = Job.getInstance(conf, "myWordCount"); //设置一个用户定义的job名称 62 job.setJarByClass(MyWordCount.class); 63 job.setMapperClass(TokenizerMapper.class); //为job设置Mapper类 64 job.setCombinerClass(IntSumReducer.class); //为job设置Combiner类 65 job.setReducerClass(IntSumReducer.class); //为job设置Reducer类 66 job.setOutputKeyClass(Text.class); //为job的输出数据设置Key类 67 job.setOutputValueClass(IntWritable.class); //为job输出设置value类 68 69 FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/user/root/input/bashrc.txt")); 70 FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/root/output")); 71 72 System.exit(job.waitForCompletion(true) ?0 : 1); //运行job 73 } 74 75 }