zoukankan      html  css  js  c++  java
  • hadoop_05

    hadoop05

    • setup cleanup
    setup(),此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高!
    cleanup(),此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后,进行相关变量或资源的释放工作。若是将释放资源工作放入方法map()中,也会导致Mapper任务在解析、处理每一行文本后释放资源,而且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高!
    

    1.小文件处理

    • 因mapreduce处理文件逻辑比较复杂,mapreduce不适合处理大量小文件。

    • 小文件合并

      package com.xjk.mm;
      
      import java.io.IOException;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.lib.input.FileSplit;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      
      
      
      public class Merger {
      	static class MergerMapper extends Mapper<LongWritable,Text,Text,Text>{
      		String fileName = null;
      		protected void setup(Mapper<LongWritable,Text,Text,Text>.Context context){
                  // 获取文件名称。
      			FileSplit f = (FileSplit)context.getInputSplit();
      			fileName = f.getPath().getName();
      		}
      		
      		Text k = new Text();
      		Text v = new Text();
      		@Override
      		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
      				throws IOException, InterruptedException {
      			String line = value.toString();
      			k.set(fileName);
      			v.set(line);
      			context.write(k, v);
      		}
      	}	
      	static class MergerReducer extends Reducer<Text,Text,Text,Text>{
      		Text v = new Text();
      		protected void reduce(Text key, Iterable<Text> values, Reducer<Text,Text,Text,Text>.Context context) throws IOException, InterruptedException {
      			StringBuilder sb = new StringBuilder();
      			for (Text text : values) {
      				sb.append(text.toString() + " ");
      			}
      			v.set(sb.toString().trim());
      			context.write(key, v);
      		}
      	}
      
      	public static void main(String[] args) throws Exception {
      		Configuration conf = new Configuration();
      		conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache"); 
      		
      		Job job = Job.getInstance(conf);
      		job.setMapperClass(MergerMapper.class);
      		job.setReducerClass(MergerReducer.class);
      		
      		job.setMapOutputKeyClass(Text.class);
      		job.setMapOutputValueClass(Text.class);
      		
      		job.setOutputKeyClass(Text.class);
      		job.setOutputValueClass(Text.class);
      		job.setNumReduceTasks(1);
      		FileInputFormat.setInputPaths(job, new Path("d:/data/merger/input/"));
      		FileOutputFormat.setOutputPath(job, new Path("d:/data/merger/output/"));
      		
      		boolean b = job.waitForCompletion(true);
      		
      		System.exit(b?0:-1);
      		
      	}
      }
      
      
    
    ## 2.join方法
    
    - MapReduce提供了表连接操作其中包括Map端join、Reduce端join还有半连接,现在我们要讨论的是Map端join,Map端join是指数据到达map处理函数之前进行合并的,效率要远远高于Reduce端join,因为Reduce端join是把所有的数据都经过Shuffle,非常消耗资源。
    
    - 案例
    
      ```java
      orders.txt
      	订单号   用户id
      	order001,u002
      	order001,u003
      	...
      user.txt
      	订单号    用户id 用户名 年龄  朋友
      	order004,u001,senge,18,angelababy
      	...
    
    • 代码:

      // JoinBean.java
      package com.xjk.join;
      
      import java.io.DataInput;
      import java.io.DataOutput;
      import java.io.IOException;
      import org.apache.hadoop.io.Writable;
      /*
       * 存储拼接好两张表的数据
       * */
      public class JoinBean implements Writable {
      	private String oid;
      	private String uid;
      	private String name;
      	private int age;
      	private String friend;
      	private String tables; //标识存储在哪张表
      	public String getOid() {
      		return oid;
      	}
      	public void setOid(String oid) {
      		this.oid = oid;
      	}
      	public String getUid() {
      		return uid;
      	}
      	public void setUid(String uid) {
      		this.uid = uid;
      	}
      	public String getName() {
      		return name;
      	}
      	public void setName(String name) {
      		this.name = name;
      	}
      	public int getAge() {
      		return age;
      	}
      	public void setAge(int age) {
      		this.age = age;
      	}
      	public String getFriend() {
      		return friend;
      	}
      	public void setFriend(String friend) {
      		this.friend = friend;
      	}
      	public String getTables() {
      		return tables;
      	}
      	public void setTables(String tables) {
      		this.tables = tables;
      	}
      	@Override
      	public String toString() {
      		return uid + "," + name + "," + age + ","+ friend;
      	}
      	public void write(DataOutput out) throws IOException{
      		out.writeUTF(oid);
      		out.writeUTF(uid);
      		out.writeUTF(name);
      		out.writeInt(age);
      		out.writeUTF(friend);
      		out.writeUTF(tables);
      	}
      	public void readFields(DataInput in)throws IOException{
      		this.oid = in.readUTF();
      		this.uid = in.readUTF();
      		this.name = in.readUTF();
      		this.age = in.readInt();
      		this.friend = in.readUTF();
      		this.tables = in.readUTF();
      	}
      }
      
      // Join.java
      
      
      package com.xjk.join;
      
      import java.io.IOException;
      import java.util.ArrayList;
      import java.util.HashMap;
      import java.util.List;
      import java.util.Map;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.lib.input.FileSplit;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      
      public class Join {
      	static class JoinMapper extends Mapper<LongWritable, Text, Text, JoinBean>{
      		String fileName = null;
      		@Override
      		protected void setup(Mapper<LongWritable, Text, Text, JoinBean>.Context context)
      				throws IOException, InterruptedException {
      			FileSplit f = (FileSplit)context.getInputSplit();
      			fileName = f.getPath().getName();
      		}
      		Text k = new Text();
      		JoinBean j = new JoinBean();
      		@Override
      		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, JoinBean>.Context context)
      				throws IOException, InterruptedException {
      			String line = value.toString();
      			// 读取数据
      			String[] split = line.split(",");
      			if (fileName.startsWith("orders")) {
      				// 读取订单文件
      				j.setOid(split[0]);
      				j.setUid(split[1]);
      				// 因JoinBean其他类变量如果为null,在反序列化会报错,
      				// 可以设置为一个不可见字符
      				j.setName("01");
      				j.setAge(-1);
      				j.setFriend("01");
      				j.setTables("orders");//标识存储在哪个文件
      			}else {
      				// 用户数据
      				j.setUid(split[0]);
      				j.setName(split[1]);
      				j.setAge(Integer.parseInt(split[2]));
      				j.setFriend(split[4]);
      				j.setOid("01");
      				j.setTables("user");
      			}
      			k.set(j.getUid());
      			context.write(k,j);
      		}
      	}
      	
      	static class JoinReducer extends Reducer<Text, JoinBean, Text, NullWritable>{
      		List<JoinBean> list = new ArrayList<>();
      		Map<String,JoinBean> map = new HashMap<>();	
      		@Override
      		protected void reduce(Text uid, Iterable<JoinBean> values,
      				Reducer<Text, JoinBean, Text, NullWritable>.Context context) throws IOException, InterruptedException {
      			// 遍历循环拿joinBean数据
      			for (JoinBean joinBean : values) {
      				String table = joinBean.getTables();
      				if (table.equals("orders")) {
      					// 存储订单信息
      					JoinBean orders = new JoinBean();
      					// 订单只有uid,oid
      					orders.setUid(joinBean.getUid());
      					orders.setOid(joinBean.getOid());
      					list.add(orders);
      				}else {
      					// 存储用户信息
      					JoinBean users = new JoinBean();
      					users.setUid(joinBean.getUid());
      					users.setName(joinBean.getName());
      					users.setAge(joinBean.getAge());
      					users.setFriend(joinBean.getFriend());
      					map.put(users.getUid(), users);
      				}
      			}
      		}
      		@Override
      		protected void cleanup(Reducer<Text, JoinBean, Text, NullWritable>.Context context)
      				throws IOException, InterruptedException {
      			for (JoinBean o : list) {
      				// 根据list中uid,查找对应map用户
      				JoinBean user = map.get(o.getUid());
      				// 数据拼接
      				String res = o.getOid() + "," +user.toString();
      				// 数据写出
      				context.write(new Text(res), NullWritable.get());
      			}
      		}	
      	}
      	public static void main(String[] args) throws Exception {
      		Configuration conf = new Configuration();
      		 conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache"); 
      		Job job = Job.getInstance(conf);
      		job.setMapperClass(JoinMapper.class);
      		job.setReducerClass(JoinReducer.class);
      		// 输出设置:
      		job.setMapOutputKeyClass(Text.class);
      		job.setMapOutputValueClass(JoinBean.class);
      		
      		job.setOutputKeyClass(Text.class);
      		job.setOutputValueClass(NullWritable.class);
      		// 输入输出路径 
      		FileInputFormat.setInputPaths(job, new Path("d:/data/orderdata/input/"));
      		FileOutputFormat.setOutputPath(job, new Path("d:/data/orderdata/output/"));
      		
      		boolean b = job.waitForCompletion(true);
      		// 程序退出 , 0  代表正常退出  非0 代表异常退出
      		System.exit(b?0:-1);
      	}
      }
      
      
      
      

    3.共同好友案例

    • 的共同好友

      A:B,C,D,F,E,O
      B:A,C,E,K
      C:F,A,D,I
      D:A,E,F,L
      E:B,C,D,M,L
      F:A,B,C,D,E,O,M
      G:A,C,D,E,F
      H:A,C,D,E,O
      I:A,O
      J:B,O
      K:A,C,D
      L:D,E,F
      M:E,F,G
      O:A,H,I,J
      
      分析:
      第一行:A与B是好友,A与C是好友,A与D是好友,A与F是好友,A与E是好友,A与O是好友
      第二行:B与A是好友,B与C是好友,B与E是好友,B与K是好友,
      ...
      这样
      A与B共同好友 --> C  E
      
    • 代码:

      package com.xjk.Friend;
      
      import java.io.IOException;
      import java.util.ArrayList;
      import java.util.Collections;
      import java.util.List;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      
      
      
      public class SameF1 {
      	static class SameF1Mapper extends Mapper<LongWritable, Text, Text, Text>{
      		Text k = new Text();
      		Text v = new Text();
      		protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,Text>.Context context) throws java.io.IOException ,InterruptedException {
      			String line = value.toString();
      			String[] split = line.split(":");
      			String[] fs = split[1].split(",");
      			v.set(split[0]);
      			for (String f : fs) {
      				k.set(f);
      				context.write(k, v);
      			}
      			
      		}
      	}
      	static class SamF1Reducer extends Reducer<Text, Text, Text, Text>{
      		@Override
      		protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
      				throws IOException, InterruptedException {
      			List<String> list = new ArrayList<>();
      			for (Text text:values) {
      				list.add(text.toString());// k是当前好友,v是当前人
      			}
      			// 对list进行排序
      			Collections.sort(list);
      			
      			for (int i = 0; i < list.size()-1; i++) {
      				for (int j = i+1; j < list.size(); j++) {
      					// b-c a 
      					// b-d a
      					// b-e a
      					context.write(new Text(list.get(i) + "-" + list.get(j) + "的好友是:"), key);
      				}
      			}
      		}
      	}
      	public static void main(String[] args) throws Exception {
      		Configuration conf = new Configuration();
      		 conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache"); 
      		Job job = Job.getInstance(conf);
      		job.setMapperClass(SameF1Mapper.class);
      		job.setReducerClass(SamF1Reducer.class);
      		// 输出设置:
      		job.setMapOutputKeyClass(Text.class);
      		job.setMapOutputValueClass(Text.class);
      		
      		job.setOutputKeyClass(Text.class);
      		job.setOutputValueClass(Text.class);
      		// 输入输出路径 
      		FileInputFormat.setInputPaths(job, new Path("d:/data/Friends/input/"));
      		FileOutputFormat.setOutputPath(job, new Path("d:/data/Friends/output/"));
      		
      		boolean b = job.waitForCompletion(true);
      		// 程序退出 , 0  代表正常退出  非0 代表异常退出
      		System.exit(b?0:-1);
      	}
      }
      
      
      • 会有重复
      B-C的好友是:	A
      B-D的好友是:	A
      B-F的好友是:	A
      B-G的好友是:	A
      B-H的好友是:	A
      B-I的好友是:	A
      B-K的好友是:	A
      B-O的好友是:	A
      C-D的好友是:	A
      C-F的好友是:	A
      C-G的好友是:	A
      C-H的好友是:	A
      C-I的好友是:	A
      
    • 代码2:

      package com.xjk.Friend;
      
      import java.io.IOException;
      import java.util.ArrayList;
      import java.util.Collections;
      import java.util.List;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      
      
      
      public class SameF2 {
      	static class SameF2Mapper extends Mapper<LongWritable, Text, Text, Text>{
      		Text k = new Text();
      		Text v = new Text();
      		protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,Text>.Context context) throws java.io.IOException ,InterruptedException {
      			String line = value.toString();
      			String[] split =line.split("	");
      			k.set(split[0]);
      			v.set(split[1]);
      			context.write(k, v);
      	}
      	static class SamF2Reducer extends Reducer<Text, Text, Text, Text>{
      		Text v = new Text();
      		@Override
      		protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
      				throws IOException, InterruptedException {
      			StringBuilder sb = new StringBuilder();
      			for (Text text : values) {
      				sb.append(text.toString() + " ");
      			}
      			v.set(sb.toString().trim());
      			context.write(key, v);
      		}
      	}
      	public static void main(String[] args) throws Exception {
      		Configuration conf = new Configuration();
      		 conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache"); 
      		Job job = Job.getInstance(conf);
      		job.setMapperClass(SameF2Mapper.class);
      		job.setReducerClass(SamF2Reducer.class);
      		// 输出设置:
      		job.setMapOutputKeyClass(Text.class);
      		job.setMapOutputValueClass(Text.class);
      		
      		job.setOutputKeyClass(Text.class);
      		job.setOutputValueClass(Text.class);
      		// 输入输出路径 
      		FileInputFormat.setInputPaths(job, new Path("d:/data/Friends/output/"));
      		FileOutputFormat.setOutputPath(job, new Path("d:/data/Friends/output2/"));
      		
      		boolean b = job.waitForCompletion(true);
      		// 程序退出 , 0  代表正常退出  非0 代表异常退出
      		System.exit(b?0:-1);
      	}
      }
      	}
      
      
      • 聚合
      A-B的好友是:	E C
      A-C的好友是:	D F
      A-D的好友是:	E F
      A-E的好友是:	D B C
      A-F的好友是:	O B C D E
      A-G的好友是:	F E C D
      A-H的好友是:	E C D O
      A-I的好友是:	O
      

    4.yarn

    • 我们都知道hdfs重要思想是将数据存储在不同机器上(分布式存储)。它的运算使用分布式运算运用在不同机器上,并行运算一个task,机器的处理资源(如CPU,内存)使用动态扩容,对外使用统一资源而统一资源管理器就是yarn。它为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处

      1.接收客户端任务提交
      2.管理整个集群节点的资源
      3.分配任务,监控各个节点任务运行情况
      
    • 调度资源策略:

      • FIFO(废弃)
      hadoop1.x使用的默认调度器就是FIFO。FIFO采用队列方式将一个一个job任务按照时间先后顺序进行服务。比如排在最前面的job需要若干maptask和若干reducetask,当发现有空闲的服务器节点就分配给这个job,直到job执行完毕。
      
      • Capacity Scheduler

        ![image-20201215131430381](C:UsersXu jkAppDataRoamingTypora ypora-user-imagesimage-20201215131430381.png)

    在Yarn框架中,调度器是一块很重要的内容。有了合适的调度规则,就可以保证多个应用可以在同一时间有条不紊的工作。最原始的调度规则就是FIFO,即按照用户提交任务的时间来决定哪个任务先执行,但是这样很可能一个大任务独占资源,其他的资源需要不断的等待。也可能一堆小任务占用资源,大任务一直无法得到适当的资源,造成饥饿。所以FIFO虽然很简单,但是并不能满足我们的需求。

    
    - Fair Scheduler
    
    

    支持多个队列,每个队列可以配置一定的资源,每个队列中的job任务公平共享其所在队列的所有资源。

    队列中的job任务都是按照优先级分配资源,优先级越高分配的资源越多,但是为了确保公平每个job任务都会分配到资源。优先级是根据每个job任务的理想获取资源量减去实际获取资源量的差值决定的,差值越大优先级越高。

    
    ## 5.yarn的安装
    
    

    cd /opt/hdp/hadoop-2.8.5/etc/hadoop

    vim yarn-site.xml
    配置:

    yarn.resourcemanager.hostname linux01 yarn.nodemanager.aux-services mapreduce_shuffle yarn.nodemanager.resource.memory-mb 2048 yarn.nodemanager.resource.cpu-vcores 2 yarn.nodemanager.vmem-check-enabled false yarn.nodemanager.vmem-pmem-ratio 2.1

    复制到linux02 和linux03机器上

    scp yarn-site.xml linux02:(PWD scp yarn-site.xml linux03:)PWD

    一键启动hdfs

    start-dfs.sh

    jps查看启动namenode datanode

    cd /opt/hdp/hadoop-2.8.5/sbin

    启动yarn

    start-yarn.sh#此时会启动ResourceManager和NodeManager

    而stop-all.sh 是将hdfs和yarn全部停止。

    
    ## 6.进入yarn的web页面查看资源管理
    
    - http://10.0.0.134:8088/cluster
    
    ![image-20201215175354686](C:UsersXu jkAppDataRoamingTypora	ypora-user-imagesimage-20201215175354686.png)
    
    ## 7.windows中提交到yarn
    
    - 操作mapreduce设置操作hdfs配置
    
    ```java
    Configuration conf = new COnfiguration();
    // 设置操作hdfs文件
    conf.set('fs.defaultFs', 'hdfs://linux01:9000');
    // 设置程序运行在yarn ,默认local
    conf.set('mapreduce.framework.name', 'yarn');
    // 设置resourcemanager主机
    conf.set('yarn.resourcemanager.hostname','linux01');
    // 允许 mapreduce程序跨平台运行
    conf.set('mapreduce.app-submission.cross-platform','true');
    // 获取一个任务提交的工作对象
    Job job = Job.getInstance(conf);
    // 设置jar文件
    job.setJar("d:/data/input/");
        
     // 动态获取jar包
    job.setJarByClass(JobSubmit2.class);
        
        
    
    • 先删除hdfs之前output
    hdfs dfs -rm -r /data/wc/output
    
    • 示例,新建package,拿出以前统计单词的包

    ![image-20201215234201517](C:UsersXu jkAppDataRoamingTypora ypora-user-imagesimage-20201215234201517.png)

    • 更改DriverClass.java
    package com.xjk.yarn;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class DriverClass {
    	public static void main(String[] args) throws Exception {
    		// 设置用户名
    		System.setProperty("HADOOP_USER_NAME", "root");
    		// 生成默认配置
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf);
    		//conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache"); 
    		conf.set("fs.defaultFS", "hdfs://linux01:9000");
    		// 设置程序运行在yarn ,默认local
    		conf.set("mapreduce.framework.name", "yarn");
    		// 设置resourcemanager主机
    		conf.set("yarn.resourcemanager.hostname","linux01");
    		// 允许 mapreduce程序跨平台运行
    		conf.set("mapreduce.app-submission.cross-platform","true");
    		// 设置程序的jar路径
    		job.setJar("E:\data\wc.jar");
    		// map和reduce的类
    		job.setMapperClass(WordCountMapper.class);
    		job.setReducerClass(WordCountReduce.class);
    		// map输出k-v类型,
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    		//reduce输出k-v类型
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    		//输入数据  设置默认处理文件路径,默认处理文本数据long line
    		FileInputFormat.setInputPaths(job, new Path("hdfs://linux01:9000/data/wc/input/"));
    		//输出数据路径
    		FileOutputFormat.setOutputPath(job, new Path("hdfs://linux01:9000/data/wc/output/"));
    		// 设置reduce数量
    		job.setNumReduceTasks(2);
    		// 将任务提交,默认在本地运行true将job执行消息打印在控制台上。
    		job.waitForCompletion(true);
    	}
    }
    
    
    • 然后File,Export将该包打包到E:datawc.jar下。然后执行main方法。

    • 如果报错System times on machines may be out of sync.Check system time and time zones:

    有可能机器时间不同步。可使用ntp将时间同步
    
    • 查看结果:
    hdfs dfs -cat /data/wc/output/part-r-00001
    hdfs dfs -cat /data/wc/output/part-r-00000
    

    8.Linux提交yarn

    • 只需要更改main中
    // 生成默认配置
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    
    // 设置程序运行在yarn ,默认local
    conf.set("mapreduce.framework.name", "yarn");
    // 设置resourcemanager主机
    conf.set("yarn.resourcemanager.hostname","linux01");
    job.setJarByClass(DriverClass2.class);
    // map和reduce的类
    job.setMapperClass(WordCountMapper.class);
    job.setReducerClass(WordCountReduce.class);
    // map输出k-v类型,
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    //reduce输出k-v类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    //输入数据  设置默认处理文件路径,默认处理文本数据long line
    FileInputFormat.setInputPaths(job, new Path("hdfs://linux01:9000/data/wc/input/"));
    //输出数据路径
    FileOutputFormat.setOutputPath(job, new Path("hdfs://linux01:9000/data/wc/output2/"));
    // 设置reduce数量
    job.setNumReduceTasks(2);
    // 将任务提交,默认在本地运行true将job执行消息打印在控制台上。
    job.waitForCompletion(true);
    
    • 然后打包成wc2.jar,将jar包放到linux机器上,执行如下命令:
    hadoop jar /wc2.jar com.xjk.yarn.DriverClass2
    
    • 查看执行结果
    hdfs dfs -cat /data/wc/output2/part-r-00001
    

    9.考试成绩案例

    score.txt

    统计每个人每个科目均值。每个科目最高分

    数据倾斜,加random取hashcode

    1.获取每个科目平均分

    package com.xjk.score;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class StudentScore {
    	static class StudentMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    		Text k = new Text();
    		@Override
    		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    				throws IOException, InterruptedException {
    			String line = value.toString();
    			String[] split = line.split(",");
    			String course = split[0];
    			k.set(course);
    			for (int i = 2; i < split.length; i++) {
    				int s = Integer.parseInt(split[i]);
    				context.write(k, new IntWritable(s));
    			}
    		}
    	}
    	static class StudentReducer extends Reducer<Text, IntWritable, Text, Text>{
    		@Override
    		protected void reduce(Text key, Iterable<IntWritable> value, Reducer<Text, IntWritable, Text, Text>.Context context)
    				throws IOException, InterruptedException {
    			int sum = 0;
    			int count = 0;
    			for (IntWritable i : value) {
    				sum += i.get();
    				count ++;
    			}
    			double avg = sum / count;
    			context.write(key, new Text(avg+""));
    		}
    	}
    	public static void main(String[] args) throws Exception {
    				// 生成默认配置
    				Configuration configuration = new Configuration();
    				configuration.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache"); 
    				Job job = Job.getInstance(configuration);
    				// map和reduce的类
    				job.setMapperClass(StudentMapper.class);
    				job.setReducerClass(StudentReducer.class);
    				// map输出k-v类型,
    				job.setMapOutputKeyClass(Text.class);
    				job.setMapOutputValueClass(IntWritable.class);
    				//reduce输出k-v类型
    				job.setOutputKeyClass(Text.class);
    				job.setOutputValueClass(Text.class);
    				//输入数据  设置默认处理文件路径,默认处理文本数据long line
    				FileInputFormat.setInputPaths(job, new Path("d:/data/studentsocre/input"));
    				//输出数据路径
    				FileOutputFormat.setOutputPath(job, new Path("d:/data/studentsocre/output"));
    				// 设置reduce数量
    				job.setNumReduceTasks(1);
    				// 将任务提交,默认在本地运行true将job执行消息打印在控制台上。
    				job.waitForCompletion(true);
    	}
    }
    
    
  • 相关阅读:
    Windows Server 2003安装VS2010重命名项目崩溃
    分表处理设计思想和实现[转载]
    XSD中如何定义节点(Element)包含属性(Attribute)和上下文(Context)?
    数据库水平切分的原理探讨、设计思路数据库分库,分表,集群,负载均衡器
    域名注册供参考的200个前缀和后缀
    一个 XSD 实例
    了解Javascript中defer
    Asp.net中慎用Page.DataBind()
    C#中相对路径转绝对路径
    Cookies中Secure使用
  • 原文地址:https://www.cnblogs.com/xujunkai/p/14176275.html
Copyright © 2011-2022 走看看