zoukankan      html  css  js  c++  java
  • HBase 与 MapReduce 集成

    6. HBase 与 MapReduce 集成

    6.1 官方 HBase 与 MapReduce 集成

    1. 查看 HBase 的 MapReduce 任务的执行:bin/hbase mapredcp;
    2. 环境变量的导入
      1. 临时生效,在命令行执行操作:
        • export HBASE_HOME=/opt/module/hbase-1.3.4;
        • export HADOOP_HOME=/opt/module/hadoop-2.8.5;
        • export HADOOP_CLASSPATH=${HBASE_HOME}/bin/hbase mapredcp;
      2. 永久生效,在/etc/profile配置
        • export HBASE_HOME=/opt/module/hbase-1.3.4;
        • export HADOOP_HOME=/opt/module/hadoop-2.8.5;
        • 并在hadoop-env.sh配置:export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*
    3. 运行官方的 MapReduce 任务
    // ===== 案例一:统计Student表中有多少行数据 (`opt/module/hbase-1.3.4/` 目录下)
    /opt/module/hadoop-2.8.5/bin/yarn jar ./lib/hbase-server-1.3.4.jar rowcounter student
    
    
    // ===== 案例二:使用 MapReduce 将本地数据导入到 HBASE
    // 1. 本地创建一个fruit.tsv文件
    1001    Apple   Red
    1002    Pear    Yellow
    1003    Pineapple   Yellow
    
    // 2. 创建 HBase 表
    create 'fruit','info'
    
    // 3. 在 HDFS 中创建 input_fruit 文件夹并上传 fruit.tsv 文件
    /opt/module/hadoop-2.8.5/bin/hdfs dfs -mkdir /input_fruit
    /opt/module/hadoop-2.8.5/bin/hdfs dfs -put fruit.tsv /input_fruit/
    
    // 4. 执行 MapReduce, 将 fruit.tsv 导入到 HBase 的 fruit 表中
    /opt/module/hadoop-2.8.5/bin/yarn jar ./lib/hbase-server-1.3.4.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit hdfs://IP地址/input_fruit
    

    6.2 自定义HBase-MapReduce

    • 需求:将 fruit 表中的部分数据,通过MR迁入到 fruit_mr 表中
    // 1. 创建 FruitMapper 类,用于读取 fruit 表中的数据
    public class FruitMapper extends TableMapper<ImmutableBytesWritable, Put>{
    
    	@Override
    	protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
    		// 创建put对象
    		Put put = new Put(key.get());
    		
    		Cell[] cells = value.rawCells();
    		
    		for(Cell cell : cells) {
    			if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
    				put.add(cell);
    			}
    		}
    		
    		context.write(key, put);
    	}
    }
    
    // 2. 创建 FruitReducer 类,用于写入 
    public class FruitReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable>{
    
    	@Override
    	protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
    		for (Put value : values) {
    			context.write(NullWritable.get(), value);
    		}
    	}
    }
    
    // 3. 创建 FruitDriver 类,用于执行 mapper 和 reducer
    public class FruitDriver extends Configuration implements Tool{
    
    	private Configuration configuration = null;
    	
    	@Override
    	public void setConf(Configuration conf) {
    		this.configuration = conf;
    	}
    	
    	@Override
    	public Configuration getConf() {
    		return configuration;
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		// 获取任务对象
    		Job job = Job.getInstance(configuration);
    		
    		// 指定 Driver类
    		job.setJarByClass(FruitDriver.class);
    		
    		// 指定 Mapper
    		TableMapReduceUtil.initTableMapperJob("fruit", new Scan(), FruitMapper.class, ImmutableBytesWritable.class, Put.class, job);
    		
    		// 指定 Reducer
    		TableMapReduceUtil.initTableReducerJob("fruit_mr", FruitReducer.class, job);
    		
    		// 提交
    		boolean result = job.waitForCompletion(true);
    		
    		return result ? 0 : 1;
    	}
    
    	public static void main(String[] args) throws Exception {
    		
    		Configuration configuration = HBaseConfiguration.create();
    		ToolRunner.run(configuration, new FruitDriver(), args);
    	}
    }
    
    // 4. 打成 fruit.jar包
    // 5. HBase 中创建 fruit_mr 表
    create 'fruit_mr','info'
    
    // 6. 在 /opt/module/hbase 中执行:
    /opt/module/hadoop-2.8.5/bin/yarn jar ./fruit.jar com.noodles.mr1.FruitDriver(Driver的类名)
    

    6.3 自定义 HBase-MapReduce2

    • 需求:实现将 HDFS 中的数据写入到 HBase 表中
    // 1. 创建 Mapper, 用于读取 HDFS 上的文件
    public class HDFSMapper extends Mapper<LongWritable, Text, NullWritable, Put>{
    
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, Put>.Context context)
    			throws IOException, InterruptedException {
    		// 获取一行数据
    		String line = value.toString();
    		
    		// 切割
    		String[] split = line.split("	");
    		
    		// 封装 Put 对象
    		Put put = new Put(Bytes.toBytes(split[0]));
    		put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1]));
    		put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(split[2]));
    		
    		// 写出去
    		context.write(NullWritable.get(), put);
    	}
    }
    
    // 2. 创建 Reducer, 用于写入
    public class HDFSReducer extends TableReducer<NullWritable, Put, NullWritable>{
    
    	@Override
    	protected void reduce(NullWritable key, Iterable<Put> values,
    			Reducer<NullWritable, Put, NullWritable, Mutation>.Context context) throws IOException, InterruptedException {
    		
    		// 写出数据
    		for(Put value : values) {
    			context.write(NullWritable.get(), value);
    		}
    	}
    }
    
    // 3. 创建Driver
    public class HDFSDriver extends Configuration implements Tool{
    	
    	private Configuration configuration = null;
    
    	@Override
    	public void setConf(Configuration conf) {
    		this.configuration = conf;
    	}
    
    	@Override
    	public Configuration getConf() {
    		return configuration;
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    
    		// 获取 Job 对象
    		Job job = Job.getInstance(configuration);
    		
    		// 设置主类
    		job.setJarByClass(HDFSDriver.class);
    		
    		// 设置 Mapper
    		job.setMapperClass(HDFSMapper.class);
    		job.setMapOutputKeyClass(NullWritable.class);
    		job.setMapOutputValueClass(Put.class);
    		
    		// 设置 Reducer
    		TableMapReduceUtil.initTableReducerJob("fruit2", HDFSReducer.class, job);
    
            // 设置输入路径
    		// import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    		FileInputFormat.setInputPaths(job, args[0]);
    		
    		// 提交
    		boolean result = job.waitForCompletion(true);
    		
    		return result ? 0 : 1;
    	}
    	
    	public static void main(String[] args) throws Exception {
    		
    		Configuration configuration = HBaseConfiguration.create();
    		ToolRunner.run(configuration, new HDFSDriver(), args);
    
    	}
    }
    
    // 4. 打成 fruit.jar包
    // 5. HBase 中创建 fruit2 表
    create 'fruit2','info'
    
    // 6. 在 /opt/module/hbase 中执行:
    /opt/module/hadoop-2.8.5/bin/yarn jar ./fruit.jar com.noodles.mr2.HDFSDriver(Driver的类名) /input_fruit/fruit.tsv(文件路径)
    
  • 相关阅读:
    内存映射
    docstring show under decorator
    eventlet dbpool for postgresql &mysql
    za python
    Install MySQL 5.0 Max on FC3
    vi
    ff chrome tips
    20101004网站部署更新
    sqlalchemy type &elixir type
    20100930网站部署更新日志
  • 原文地址:https://www.cnblogs.com/linkworld/p/11069763.html
Copyright © 2011-2022 走看看