zoukankan      html  css  js  c++  java
  • hadoop03

    hadoop 03

    1.checkpoint机制

    • hadoop如何进行checkpoint的呢?

      hdfs的源数据记录在内存中,它是一个对象。当客户端进行hdfs操作(rm mkdir...),然后传输给服务端namenode进行解析,执行更新操作,为了保证数据持久化,它会把数据进行序列化存储,当然它不会每操作一次进行序列化一次,否则资源消耗太大,它将操作命令记录在日志中,记录日志文件伴随记录日志文件越来越大,它会有个最大存储空间,当到大了最大存储空间会再生成一个日志文件进行记录(日志1,日志2,日志3...),而namenode会定期的把持久化文件和日志文件传输到secondary namenode ,传输到拷贝文件在secondary会将内容反序列化存储对象,将加载传输日志操作进行更新,会形成一个新的存储对象,将新的存储对象序列化,形成新的存储对象会传输到namenode中并进行更新操作。
      
    • hadoop checkpoint机制 默认1小时合并或一分钟内操作100万次操作进行合并。

    2.windows下本地运行mapreduce

    • 下载hadoop并解压。如果运行时报空指针异常需要下载对应版本hadoop.dll和winutils.exe hadoop.dll放在c://windows/system32 。winutils.exe放在hadoop/bin下。

    下载winutils.exe

    https://github.com/cdarlint/winutils

    3.MapReduce

    • MapReduce主要分为2个部分,编程模型和运行时环境

      编程模型:提供简单的接口,实现几个简单函数实现分布式程序
      运行时环境:比较复杂节点的通信,节点失效,数据切分等,也可运行在YARN平台(负责资源调度),
      
    • 特点:

      易于编程,良好扩展性,高的容错性
      
    • MapReduce组件

      InputFormat
      Mapper
      Parititioner(分区)
      Reduce
      
    • 它的模式分为

      local模式(本地)
      yarn集群模式
      
    • 因为单台机器运算能力有限,mapreduce是分布式的运算框架,可以解决海量数据的快速运算。假如你有3台机器,三台机器指定任务为止,并行运算。

    • 以单词统计为例:

      我们有一个文件/data/txt/1.txt   我们通过任务划分让不同机器处理不同范围数据。输出中间结果存储在不同两个任务,通过hashcode指定中间结果分配到任务区。由Reduce处理指定任务。
      
    • 运算整体的逻辑被分成两步map(分区)reduce(聚合)。hashcode%reduce机器的个数,将key分组,reduce处理指定任务。

    • 我们做文件处理时候关键点:

      1.做任务存储切块。
      2.做任务的切片。
      3.分区保证相同key在同一个reduce中,使用hashcode算法。
      

    2.0前序-单词统计:

    • 当前要统计一个文本单词数量那么如何做呢?map-reduce

      假如我们对一个300M文件进行单词统计,集群中我们配置3台机器(l1,l2,l3)。文件分block块存储在3台机器上:
      l1 0M-128M
      l2 128M-256M
      l3 257M-300M
          
      我们通过编写的map分布在每天机器上,每台机器上通过读取文本中指定偏移内容,通过map读取每一行的数据(每读取一行信息map调用一次),并进行计数1。然后通过hashcode它们的key 映射到相应机器的reduce中进行汇总,映射到reduce对不同的key分组,key有多少组,reduce调用读诵好次。
      
      • WordCountMapper.java
      import java.io.IOException;
      
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Mapper;
      
      /*
       *1.map端输入是偏移量和行数据key-value long string
       *2.map端输出key-value数据
       *3.map输出为reduce的输入key-value序列化
       *Sring Text
       *long Longwritable
       *int IntWritable
       * 
       */
      
      public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
      	@Override
      	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
      			throws IOException, InterruptedException {
      		// value转换字符串
      		String line = value.toString();
      		// 切割
      		String[] words = line.split(" ");
      		for(String word:words) {
      			// 将处理结果写出key value形式,到reduce端
      			context.write(new Text(word), new IntWritable(1));
      		}
      	}
      }
      
      
      • WordCountReduce.java
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Reducer;
      /*
       *KEYIN Text
       *VALUEIN IntWritable
       * 
       */
       
      public class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
      	protected void reduce(Text key, Iterable<IntWritable> iters, Reducer<Text,IntWritable,Text,IntWritable>.Context context) throws java.io.IOException ,InterruptedException {
      		// 遍历循环
      		int count  = 0;
      		for(IntWritable intWritable :iters) {
      			count ++;
      		}
      		context.write(key, new IntWritable(count));
      	};
      }
      
      
      • 上述mapper的key-value会存在问题,如果我们有100万行数据就会创建100万key和value显然性能是很有影响的。
      public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
          // 创建k-v对象。
      	Text k = new Text();
      	IntWritable v = new IntWritable();
      	@Override
      	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
      			throws IOException, InterruptedException {
      		// value转换字符串
      		String line = value.toString();
      		// 切割
      		String[] words = line.split(" ");
      		for(String word:words) {
      			k.set(word);
      			v.set(1);
      			context.write(k,v);
      		}
      	}
      }
      
      

    2.1分布式统计单词

    1.Map程序:

    • 读取HDFS指定范围的文件内容,并进行处理,写入到不同文件中。
    package com.xjk.mapreduce;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    /*
     * 1.读取HDFS数据 /data/
     * 
     * */
    public class MapTask {
    	public static void main(String[] args) throws Exception {
    		// 在执行程序的时候主函数是可以接收参数的
    		// 在主程序运行时候指定读取文件
    		String path = args[0];
    		// 读取任务起始位置
    		long start = Long.parseLong(args[1]);
    		// 读取任务长度
    		long length = Long.parseLong(args[2]);
    		// 任务编号
    		String taskId = args[3];
    		// 获取操作hdfs客户端对象
    		Configuration conf = new Configuration();
    		// FileSystem t = FileSystem.newInstance(uri, conf, user)()
    		FileSystem fs = FileSystem.newInstance(new URI("hdfs://linux01:9000/"), conf, "root");
    		//FileSystem fs = FileSystem.get(new URI("hdfs://linux01:9000/"), conf, "root");
    		// 创建写对象
    		FSDataOutputStream out0 = fs.create(new Path("/data/wc/output/map_" + taskId+ "_0"));
    		FSDataOutputStream out1 =fs.create(new Path("/data/wc/output/map_" + taskId+ "_1"));
    		// 要读取文件
    		Path file = new Path(path);
    		// 读取hdfs指定数据
    		FSDataInputStream fin = fs.open(file);
    		if (start!=0) {
    			fin.seek(start);// 表示从起始位置开始读
    		}
    		BufferedReader br = new BufferedReader(new InputStreamReader(fin));
    		String line = null;
    		long count = 0;
    		if (start !=0) {
    			br.readLine();// 如果当前读取行数据不在起始位置,就会将当前行数据读取,舍去
    		}
    		
    		
    		while ((line = br.readLine()) != null) {
    			count +=line.length()+2;//换行存在/r/n情况
    			String[] words = line.split(" ");
    			// 将单词写到不同文件中 根据单词hashcode reduce 个数为2
    			for (String word: words) {
    				if((word.hashCode()&Integer.MAX_VALUE)%2==0) {
    					out0.write((word + "	" + 1 + "
    ").getBytes());
    				}else {
    					out1.write((word + "	" + 1 + "
    ").getBytes());
    				}
    			}
    			if (count > length) {
    				break;
    			}
    			
    		}
    		out0.close();
    		out1.close();
    		br.close();
    		fin.close();
    		fs.close();
    		
    		
    	}
    }
    
    

    2Reduce程序:

    package com.xjk.ts;
    
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.net.URISyntaxException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.Set;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.LocatedFileStatus;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.RemoteIterator;
    
    /*
     * 1.根据任务编号来处理对应数据
     * 2.
     * */
    public class ReduceTask {
    	public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
    		Map<String,Integer> map = new HashMap<>();
    		String taskId = args[0];
    		// 读取HDFS的数据
    		Configuration conf = new Configuration();
    		FileSystem fs = FileSystem.newInstance(new URI("hdfs://linux01:9000/"), conf, "root");
    		//FileSystem fs = FileSystem.get(new URI("hdfs://linux01:9000/"), conf, "root");
    		RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/data/wc/output"), false);
    		while (listFiles.hasNext()) {
    			LocatedFileStatus next = listFiles.next();
    			Path path = next.getPath();
    			String fileName = path.getName();
    			if (fileName.endsWith(taskId)) {
    				// 读取当前文件
    				FSDataInputStream fin = fs.open(path);
    				BufferedReader br = new BufferedReader(new InputStreamReader(fin));
    				String line = null;
    				while ((line=br.readLine()) != null) {
    					String[] split = line.split("	");
    					String word = split[0];
    					map.put(word, map.getOrDefault(word,0)+1);
    				}
    				br.close();
    				fin.close();
    			}
    		}
    		FSDataOutputStream out = fs.create(new Path("/data/wc/res/readuce_" + taskId));
    		
    		// map
    		Set<Entry<String,Integer>> entrySet = map.entrySet();
    		for (Entry<String, Integer> entry:entrySet) {
    			out.write((entry.getKey()+ "	" + entry.getValue()+"
    ").getBytes());
    		}
    		out.close();
    		//fs.close();
    	}
    }
    
    
    • 将项目打jar包
    右键项目->Export->JAR File->选取路径
    
    • 上传jar和源数据文件
    • 执行命令:
    执行命令:
    
    map:
    			执行类Map类				指定源数据文件路经		起始位置  提取字节  任务id
    hadoop jar wc.jar com.xjk.ts.MapTask /data/wc/input/word.txt 0 2000 0
    
    reduce:
    			执行reduce类			    任务id
    hadoop jar wc.jar com.xjk.ts.ReduceTask 0
    
    
    
    
    

    如报错:Filesystem closed、InterruptedIOException、The client is stopped、IOException

    方式1:
    conf.set("fs.hdfs.impl.disable.cache", "true");
    方式2:
    <property>
      <name>fs.hdfs.impl.disable.cache</name>
      <value>true</value>
      <description></description>
    </property>
    并且加上:
    FileSystem fs = FileSystem.newInstance(new URI("hdfs://linux01:9000/"), conf, "root");
    // 原因:
    参数fs.hdfs.impl.disable.cache默认为false,于是这个conf被Cache,导致在方法外的FileSystem closed,设置为true即可。
    get方法不是每次都创建FileSystem对象,会从缓存中获取FileSystem对象,而newInstance方法则会每次都创建新对象。所以在使用该对象的API编程时,推荐使用get方法。
    注意:用get不能close,否则多线程报错(所以我用static),而用newInstance必须每次close。
    
    • 小插曲:
    find命令:
    # 查找根目录下以jar结尾   xargs打印一行, sed 's/ /:/g'将所有空格替换成:
    find / -name "*.jar" | xargs | sed 's/ /:/g'
    

    2.3程序运行

    • 任务执行:

      1.MapTask 和ReduceTask 打包
      2.准备数据
      3.将jar包上传linux机器上
      4.在不同机器上执行MapTask ReduceTask任务,通过参数传递任务量。
      
      

    4.单机统计单词示例

    • 统计一个文件夹下单词数量:
    WordCountMapper.java
    package com.xjk.wordcounts;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class WordCountMapper  extends Mapper<LongWritable, Text, Text, IntWritable>{
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
        		throws IOException, InterruptedException {
        	String line = value.toString();
        	String[] words = line.split(" ");
        	for (String word : words) {
        		//将处理的结果写出  key value ----> reduce端
        		context.write(new Text(word), new IntWritable(1));
    		}
        }
    
    }
    // WordCountReduce.java
    package com.xjk.wordcounts;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
    	@Override
    	protected void reduce(Text key, Iterable<IntWritable> iters,
    			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    		//hello   6 
    		int count = 0 ;
    		for (IntWritable intWritable : iters) {
    			count ++ ;
    		}
    		context.write(key, new IntWritable(count));
    	}
    }
    
    // DriverClass.java
    
    package com.xjk.wordcounts;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class DriverClass {
    	public static void main(String[] args) throws Exception {
    		// 生成默认配置
    		Configuration configuration = new Configuration();
    		configuration.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache"); 
    		Job job = Job.getInstance(configuration);
    		// map和reduce的类
    		job.setMapperClass(WordCountMapper.class);
    		job.setReducerClass(WordCountReduce.class);
    		// map输出k-v类型,
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    		//reduce输出k-v类型
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    		//输入数据  设置默认处理文件路径,默认处理文本数据long line
    		FileInputFormat.setInputPaths(job, new Path("d:/wc/"));
    		//输出数据路径
    		FileOutputFormat.setOutputPath(job, new Path("d:/wc/res"));
    		// 设置reduce数量
    		job.setNumReduceTasks(2);
    		// 将任务提交,默认在本地运行true将job执行消息打印在控制台上。
    		job.waitForCompletion(true);
    	}
    }
    
    
    
    • reduce可自己进行数量设置。map启动数量默认是按照文件格式来进行map任务启动。比如你启动10个maptask,就会有10个maptask.但是这10个map小于128M.如果高于128M就会进行任务切片。

    5.统计电影平均分

    • 设计

      Mapper中 以电影名字做为key,分数做为value
      根据电影hashcode取余进行分组,分给不同reducer
      
    • 代码

      // MovieMapper.java
      import java.io.IOException;
      
      import org.apache.hadoop.io.DoubleWritable;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Mapper;
      import com.google.gson.Gson;
      
      // 电影数据输入k-v 输出k-v
      public class MovieMapper extends Mapper<LongWritable, Text,Text,DoubleWritable> {
      	@Override
      	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
      			throws IOException, InterruptedException {
      		// 读取每行数据
      		String line =value.toString();
      		// 将每行数据转换成对象
      		Gson gs = new Gson();
      		// 用Gson 将每行数据转成java对象
      		MoiveBean mb = gs.fromJson(line, MoiveBean.class);
      		// 输出k-v 电影-分数
      		context.write(new Text(mb.getMovie()), new DoubleWritable(mb.getRate()));
      		
      	}
      }
      // MobieReduce.java
      import java.io.IOException;
      
      import org.apache.hadoop.io.DoubleWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Reducer;
      
      public class MovieReduce extends Reducer<Text,DoubleWritable,Text,DoubleWritable>{
      	// 相同电影id会执行一次这个方法
      	// Iterable存储相同一组电影Id
      	@Override
      	protected void reduce(Text movie, Iterable<DoubleWritable> iters,
      			Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
      		// 总分
      		double sum = 0;
      		// 数量统计
      		int count = 0;
      		for (DoubleWritable doubleWritable: iters) {
      			sum += doubleWritable.get();
      			count++;
      		}
      		// 平均分
      		double avg = sum / count;
      		// 写出
      		context.write(movie, new DoubleWritable(avg));
      	}
      }
      // MovieBean.java
      public class MoiveBean {
      	private String movie;
      	private double rate;
      	private String timeStamp;
      	private int uid;
      	@Override
      	public String toString() {
      		return "MoiveBean [movie=" + movie + ", rate=" + rate + ", timeStamp=" + timeStamp + ", uid=" + uid + "]";
      	}
      	public String getMovie() {
      		return movie;
      	}
      	public void setMovie(String movie) {
      		this.movie = movie;
      	}
      	public double getRate() {
      		return rate;
      	}
      	public void setRate(double rate) {
      		this.rate = rate;
      	}
      	public String getTimeStamp() {
      		return timeStamp;
      	}
      	public void setTimeStamp(String timeStamp) {
      		this.timeStamp = timeStamp;
      	}
      	public int getUid() {
      		return uid;
      	}
      	public void setUid(int uid) {
      		this.uid = uid;
      	}
      }
      
      // MovieDriver.java
      import java.io.IOException;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.DoubleWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      
      public class MovieDriver {
      	public static void main(String[] args) throws Exception {
      		Configuration configuration = new Configuration();
      		configuration.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache");
      		Job job = Job.getInstance(configuration);
      		// 指定map和reduce的类
      		job.setMapperClass(MovieMapper.class);
      		job.setReducerClass(MovieReduce.class);
      		// map 输出k-v
      		job.setMapOutputKeyClass(Text.class);
      		job.setMapOutputValueClass(DoubleWritable.class);
      		// reduce 输出k-v
      		job.setOutputKeyClass(Text.class);
      		job.setOutputValueClass(DoubleWritable.class);
      		// 输入数据文件路经
      		FileInputFormat.setInputPaths(job, new Path("d:/data/sourcedata/simple"));
      		//输出数据路径
      		FileOutputFormat.setOutputPath(job, new Path("d:/data/movie"));
      		// 设置reduce数量
      		job.setNumReduceTasks(2);
      		// 将任务提交,默认在本地运行true将job执行消息打印在控制台上。
      		job.waitForCompletion(true);
      	}
      }
      
      
  • 相关阅读:
    BZOJ4240: 有趣的家庭菜园
    BZOJ1509: [NOI2003]逃学的小孩
    BZOJ5301: [Cqoi2018]异或序列
    BZOJ4540: [Hnoi2016]序列
    BZOJ4956: [Wf2017]Secret Chamber at Mount Rushmore
    BZOJ2141: 排队
    BZOJ1833: [ZJOI2010]count 数字计数
    HDU2089: 不要62
    BZOJ5178: [Jsoi2011]棒棒糖
    BZOJ3439: Kpm的MC密码
  • 原文地址:https://www.cnblogs.com/xujunkai/p/14055798.html
Copyright © 2011-2022 走看看