一.Hadoop插件的安装-eclipse
1.将插件放到eclipse的plugins的目录下
2.将Window编译后的hadoop文件放到hadoop的bin目录下
3.添加环境变量的支持
4.开放HDFS的权限
常规操作:将程序开发完成之后,直接将项目打包,然后rz到HDFS上执行
修改hdfs-site.xml配置文件【默认是开启的】
28 <property> 29 <name>dfs.permission</name> 30 <value>false</value> 31 </property>
5.Eclipse中的Hadoop相关配置
二.MapReduce实现简单词频统计
案例:受新型冠状肺炎疫情的影响,村中出现了代购人,代购人需要统计所需的所有商品
现XXX村有300户居民,所需代购产品如下:
洗漱用品:牙刷,牙膏,杯子,脸盆,肥皂,沐浴露,洗发水【1-6】
床上用品:被套,棉被,床垫,枕巾【0-3】
家用电器:插板,微波炉,电磁炉,电烤箱,灯泡,烧水壶【1-4】
2.1.数据模拟
package com.blb.analogdata; import java.io.BufferedWriter; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Random; public class getData { private static Random random = new Random(); private static ArrayList<String> bashList = new ArrayList(); private static ArrayList<String> bedList = new ArrayList(); private static ArrayList<String> homeList = new ArrayList(); static { bashList.add("牙刷"); bashList.add("牙膏"); bashList.add("杯子"); bashList.add("脸盆"); bashList.add("肥皂"); bashList.add("沐浴露"); bashList.add("洗发水"); bedList.add("被套"); bedList.add("棉被"); bedList.add("床垫"); bedList.add("枕巾"); homeList.add("插板"); homeList.add("微波炉"); homeList.add("电磁炉"); homeList.add("电烤箱"); homeList.add("灯泡"); homeList.add("烧水壶"); } //用于判断是否需要代购商品【随机】 public static boolean isNeed() { int ran = random.nextInt(1000); if(ran % 2 == 0) { return true; } return false; } //用于判断代购的产品需要多少【随机】 public static int needCount(int num) { return random.nextInt(num); } //生成300个清单 public static void main(String[] args) throws FileNotFoundException, IOException { for(int i = 0; i < 300; i++) { /** * 输出文件要用输出流 * 特别注意: * I/O流: * 字节流:InputStream,OutPutStream * 字符流:Reader,Writer * 转换流:将字节流转换为字符流 BufferWrite,BufferReader * 字节流和字符流没有提供输出文件的编码格式 * 转换流可以设置输出文件的编码格式 */ FileOutputStream out = new FileOutputStream(new File("G:\temp\"+i+".txt")); //使用转换流,设置输出文件的编码格式 BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out, "UTF-8")); //先看是否需要第一种代购商品【洗漱用品】 boolean need1 = isNeed(); if(need1) { //需求的种类不超过所有的list int count = needCount(bashList.size() + 1); //循环随机获取商品和数量 for(int j = 0; j < count; j++) { //随机获取商品 String product = bashList.get(random.nextInt(bashList.size())); //随机获取数量[1-6] int num = needCount(6)+1; //写入文件 writer.write(product + " " +num); //换行 writer.newLine(); } } //看是否需要第二种代购商品【床上用品】 boolean need2 = isNeed(); if(need2) { //需求的种类不超过所有的list int count = needCount(bedList.size() + 1); //循环随机获取商品和数量 for(int j = 0; j < count; j++) { //随机获取商品 String product = bedList.get(random.nextInt(bedList.size())); //随机获取数量[0-3] int num = needCount(3); //写入文件 writer.write(product + " " +num); //换行 writer.newLine(); } } //看是否需要第三种代购商品【家用电器】 boolean need3 = isNeed(); if(need3) { //需求的种类不超过所有的list int count = needCount(homeList.size() + 1); //循环随机获取商品和数量 for(int j = 0; j < count; j++) { //随机获取商品 String product = homeList.get(random.nextInt(homeList.size())); //随机获取数量[1-4] int num = needCount(4)+1; //写入文件 writer.write(product + " " +num); //换行 writer.newLine(); } } writer.flush(); writer.close(); } } }
2.2.将模拟的数据上传到HDFS
2.3.开始写MapReduce程序
new -> other
Map代码
package com.blb.count; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class CountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException { //读取一行的文件 String line = ivalue.toString(); //进行字符串的切分 String[] split = line.split(" "); //写入 context.write(new Text(split[0]), new IntWritable(Integer.parseInt(split[1]))); } }
Reduce代码
package com.blb.count; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class CountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text _key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { //将IntWritable转换为Int类型 int i = val.get(); sum += i; }
context.write(_key, new IntWritable(sum)); } }
Driver代码
package com.blb.count; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CountDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //配置服务器的端口和地址 conf.set("fs.defaultFS", "hdfs://192.168.0.10:9000"); Job job = Job.getInstance(conf, "CountDriver"); job.setJarByClass(com.blb.count.CountDriver.class); // TODO: specify a mapper job.setMapperClass(CountMapper.class); // TODO: specify a reducer job.setReducerClass(CountReducer.class); //如果reducer的key类型和map的key类型一样,可以不写map的key类型 //如果reduce的value类型和map的value类型一样,可以不写map的value类型 // TODO: specify output types job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // TODO: specify input and output DIRECTORIES (not files) FileInputFormat.setInputPaths(job, new Path("/product/")); FileOutputFormat.setOutputPath(job, new Path("/product_out/")); if (!job.waitForCompletion(true)) return; } }