zoukankan      html  css  js  c++  java
  • Hbase(七)hbase高级编程

    一、Hbase结合mapreduce    

         为什么需要用 mapreduce 去访问 hbase 的数据?
         ——加快分析速度和扩展分析能力
         Mapreduce 访问 hbase 数据作分析一定是在离线分析的场景下应用

           

          1、HbaseToHDFS

             从 hbase 中读取数据,分析之后然后写入 hdfs,代码实现:

    package com.ghgj.hbase.hbase2hdfsmr;
    
    import java.io.IOException;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * 作用:从hbase中读取user_info这个表的数据,然后写出到hdfs
     */
    public class HBaseToHDFSMR {
    	
    	private static final String ZK_CONNECT = "hadoop03:2181,hadoop04:2181,hadoop05:2181";
    
    	public static void main(String[] args) throws Exception {
    		
    		Configuration conf = HBaseConfiguration.create();
    		conf.set("hbase.zookeeper.quorum", ZK_CONNECT);
    		System.setProperty("HADOOP_USER_NAME", "hadoop");
    //		conf.set("fs.defaultFS", "hdfs://myha01/");
    		
    		Job job = Job.getInstance(conf);
    		job.setJarByClass(HBaseToHDFSMR.class);
    		
    		Scan scan = new Scan();
    		scan.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("name"));
    		/**
    		 * TableMapReduceUtil:以util结尾:工具
    		 * MapReduceFactory:以factory结尾,它是工厂类,最大作用就是管理对象的生成
    		 */
    		TableMapReduceUtil.initTableMapperJob("user_info", scan, 
    				HBaseToHDFSMRMapper.class, Text.class, NullWritable.class, job);
    		job.setReducerClass(HBaseToHDFSMRReducer.class);
    		
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(NullWritable.class);
    		
    		Path outputPath = new Path("/hbase2hdfs/output");
    		FileSystem fs = FileSystem.get(conf);
    		if(fs.exists(outputPath)){
    			fs.delete(outputPath);
    		}
    		FileOutputFormat.setOutputPath(job, outputPath);
    		
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.exit(waitForCompletion ? 0 : 1);
    	}
    	
    	static class HBaseToHDFSMRMapper extends TableMapper<Text, NullWritable>{
    		/**
    		 * key:rowkey
    		 * value:map方法每执行一次接收到的一个参数,这个参数就是一个Result实例
    		 * 这个Result里面存的东西就是rowkey, family, qualifier, value, timestamp
    		 */
    		@Override
    		protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, NullWritable>.Context context) throws IOException, InterruptedException {
    			String rowkey = Bytes.toString(key.copyBytes());
    			System.out.println(rowkey);
    			List<Cell> cells = value.listCells();
    			for (int i = 0; i < cells.size(); i++) {
    				Cell cell = cells.get(i);
    				String rowkey_result = Bytes.toString(cell.getRow()) + "	"
    						+ Bytes.toString(cell.getFamily()) + "	"
    						+ Bytes.toString(cell.getQualifier()) + "	"
    						+ Bytes.toString(cell.getValue()) + "	"
    						+ cell.getTimestamp();
    				context.write(new Text(rowkey_result), NullWritable.get());
    			}
    		}
    	}
    	
    	static class HBaseToHDFSMRReducer extends Reducer<Text, NullWritable, Text, NullWritable>{
    		@Override
    		protected void reduce(Text key, Iterable<NullWritable> arg1, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
    			context.write(key, NullWritable.get());
    		}
    	}
    }
    

      2、HDFSToHbase

            从 hdfs 从读入数据,处理之后写入 hbase,代码实现:

    package com.ghgj.hbase.hbase2hdfsmr;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.Mutation;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    public class HDFSToHBaseMR {
    	private static final String ZK_CONNECT = "hadoop03:2181,hadoop04:2181,hadoop05:2181";
    	private static final String TABLE_NAME = "person_info";
    
    	public static void main(String[] args) throws Exception {
    
    		Configuration conf = HBaseConfiguration.create();
    		conf.set("hbase.zookeeper.quorum", ZK_CONNECT);
    		System.setProperty("HADOOP_USER_NAME", "hadoop");
    		Job job = Job.getInstance(conf);
    		job.setJarByClass(HDFSToHBaseMR.class);
    
    		// 以下这一段代码是为了创建一张hbase表叫做 person_info
    		HBaseAdmin admin = new HBaseAdmin(conf);
    		HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
    		htd.addFamily(new HColumnDescriptor("base_info"));
    		if (admin.tableExists(TABLE_NAME)) {
    			admin.disableTable(TABLE_NAME);
    			admin.deleteTable(TABLE_NAME);
    		}
    		admin.createTable(htd);
    
    		// 给job指定mapperclass 和  reducerclass
    		job.setMapperClass(HDFSToHBaseMRMapper.class);
    		TableMapReduceUtil.initTableReducerJob(TABLE_NAME, HDFSToHBaseMRReducer.class, job);
    		
    		// 给mapper和reducer指定输出的key-value的类型
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(NullWritable.class);
    		job.setOutputKeyClass(ImmutableBytesWritable.class);
    		job.setOutputValueClass(Mutation.class);
    
    		// 指定输入数据的路径
    		FileInputFormat.setInputPaths(job, new Path("/hbase2hdfs/output"));
    		
    		// job提交
    		boolean boo = job.waitForCompletion(true);
    		System.exit(boo ? 0 :1);
    	}
    
    	static class HDFSToHBaseMRMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    		@Override
    		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
    			context.write(value, NullWritable.get());
    		}
    	}
    
    	/**
    	 * TableReducer extends Reducer 这么做的唯一效果就是把valueout的类型确定为Mutation
    	 */
    	static class HDFSToHBaseMRReducer extends TableReducer<Text, NullWritable, ImmutableBytesWritable> {
    
    		/**
    		 * baiyc_20150716_0001 base_info name baiyc1 1488348387443
    		 */
    		@Override
    		protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException {
    
    			String[] splits = key.toString().split("	");
    			String rowkeyStr = splits[0];
    			ImmutableBytesWritable rowkey = new ImmutableBytesWritable(Bytes.toBytes(rowkeyStr));
    
    			Put put = new Put(Bytes.toBytes(rowkeyStr));
    
    			String family = splits[1];
    			String qualifier = splits[2];
    			String value = splits[3];
    			String ts = splits[4];
    
    			put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Long.parseLong(ts), Bytes.toBytes(value));
    
    			context.write(rowkey, put);
    		}
    	}
    
    }

    二、Hbase和mysql数据库数据进行互导

          1、mysql数据导入到hbase(用sqoop)

      命令:

    sqoop import --connect jdbc:mysql://hadoop01/mytest --username root --password root
    --table student --hbase-create-table --hbase-table studenttest --column-family name
    --hbase-row-key id

    其 中 会 报 错 , 说 Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.HTableDescriptor.addFamily(Lorg/apache/hadoop/hbase/HColumnDescriptor;)V 是由于版本不兼容引起,我们可以通过事先创建好表就可以使用了。
    请使用下面的命令:

    sqoop import --connect jdbc:mysql://hadoop01/mytest --username root --password root
    --table student --hbase-table studenttest1 --column-family name --hbase-row-key id

    --hbase-create-table 自动在 hbase 中创建表
    --column-family name 指定列簇名字
    --hbase-row-key id 指定 rowkey 对应的 mysql 当中的键

        2、hbase数据导入到mysql

    目前没有直接的命令将 Hbase 中的数据导出到 mysql,但是可以先将 hbase 中的数据导 出到 hdfs 中,再将数据导出 mysql

    替代方案:
    先将 hbase 的数据导入到 hdfs 或者 hive,然后再将数据导入到 mysql

    三、hbase整合hive

         原理:

    Hive 与 HBase 利用两者本身对外的 API 来实现整合,主要是靠 HBaseStorageHandler 进 行通信,利用 HBaseStorageHandler, Hive 可以获取到 Hive 表对应的 HBase 表名,列簇以及 列, InputFormat 和 OutputFormat 类,创建和删除 HBase 表等。

    Hive 访问 HBase 中表数据,实质上是通过 MapReduce 读取 HBase 表数据,其实现是在 MR 中,使用 HiveHBaseTableInputFormat 完成对 HBase 表的切分,获取 RecordReader 对象来读 取数据。

    对 HBase 表的切分原则是一个 Region 切分成一个 Split,即表中有多少个 Regions,MR 中就有多 少个 Map。

    读取 HBase 表数据都是通过构建 Scanner,对表进行全表扫描,如果有过滤条件,则转化为 Filter。当过滤条件为 rowkey 时,则转化为对 rowkey 的过滤, Scanner 通过 RPC 调用  RegionServer 的 next()来获取数据;

     1、准备hbase表 数据

    create 'mingxing',{NAME => 'base_info',VERSIONS => 1},{NAME => 'extra_info',VERSIONS => 1}

    插入数据:

    put 'mingxing','rk001','base_info:name','huangbo'
    put 'mingxing','rk001','base_info:age','33'
    put 'mingxing','rk001','extra_info:math','44'
    put 'mingxing','rk001','extra_info:province','beijing'
    put 'mingxing','rk002','base_info:name','xuzheng'
    put 'mingxing','rk002','base_info:age','44'
    put 'mingxing','rk003','base_info:name','wangbaoqiang'
    put 'mingxing','rk003','base_info:age','55'
    put 'mingxing','rk003','base_info:gender','male'
    put 'mingxing','rk004','extra_info:math','33'
    put 'mingxing','rk004','extra_info:province','tianjin'
    put 'mingxing','rk004','extra_info:children','3'
    put 'mingxing','rk005','base_info:name','liutao'
    put 'mingxing','rk006','extra_info:name','liujialing'

       2、hive端操作

    三、hbasetohbase   byMR

    package com.ghgj.hbase.hbase2hdfsmr;
    
    import java.io.IOException;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.Mutation;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class HBaseToHBaseByMR {
    
    	private static final String ZK_CONNECT = "hadoop03:2181,hadoop04:2181,hadoop05:2181";
    	private static final String OLD_TABLE_NAME = "user_info";
    	private static final String NEW_TABLE_NAME = "person_info2";
    	private static final String FAMILY = "base_info";
    	private static final String QUALIFIER = "age";
    
    	public static void main(String[] args) throws Exception {
    
    		Configuration conf = HBaseConfiguration.create();
    		conf.set("hbase.zookeeper.quorum", ZK_CONNECT);
    		System.setProperty("HADOOP_USER_NAME", "hadoop");
    		// conf.set("fs.defaultFS", "hdfs://myha01/");
    
    		Job job = Job.getInstance(conf);
    		job.setJarByClass(HBaseToHDFSMR.class);
    
    		// 以下这一段代码是为了创建一张hbase表叫做 person_info
    		HBaseAdmin admin = new HBaseAdmin(conf);
    		HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(NEW_TABLE_NAME));
    		htd.addFamily(new HColumnDescriptor(FAMILY));
    		if (admin.tableExists(NEW_TABLE_NAME)) {
    			admin.disableTable(NEW_TABLE_NAME);
    			admin.deleteTable(NEW_TABLE_NAME);
    		}
    		admin.createTable(htd);
    
    		Scan scan = new Scan();
    		scan.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(QUALIFIER));
    		/**
    		 * TableMapReduceUtil:以util结尾:工具
    		 * MapReduceFactory:以factory结尾,它是工厂类,最大作用就是管理对象的生成
    		 */
    		TableMapReduceUtil.initTableMapperJob(OLD_TABLE_NAME, scan, HBaseToHBaseByMRMapper.class, Text.class, NullWritable.class, job);
    		TableMapReduceUtil.initTableReducerJob(NEW_TABLE_NAME, HBaseToHBaseByMRReducer.class, job);
    
    		// 给mapper和reducer指定输出的key-value的类型
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(NullWritable.class);
    		job.setOutputKeyClass(ImmutableBytesWritable.class);
    		job.setOutputValueClass(Mutation.class);
    
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.exit(waitForCompletion ? 0 : 1);
    	}
    
    	static class HBaseToHBaseByMRMapper extends TableMapper<Text, NullWritable> {
    		/**
    		 * key:rowkey value:map方法每执行一次接收到的一个参数,这个参数就是一个Result实例
    		 * 这个Result里面存的东西就是rowkey, family, qualifier, value, timestamp
    		 */
    		@Override
    		protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, NullWritable>.Context context) throws IOException, InterruptedException {
    			String rowkey = Bytes.toString(key.copyBytes());
    			System.out.println(rowkey);
    			List<Cell> cells = value.listCells();
    			for (int i = 0; i < cells.size(); i++) {
    				Cell cell = cells.get(i);
    				String rowkey_result = Bytes.toString(cell.getRow()) + "	" + Bytes.toString(cell.getFamily()) + "	" + Bytes.toString(cell.getQualifier()) + "	" + Bytes.toString(cell.getValue()) + "	" + cell.getTimestamp();
    				context.write(new Text(rowkey_result), NullWritable.get());
    			}
    		}
    	}
    
    	/**
    	 * TableReducer extends Reducer 这么做的唯一效果就是把valueout的类型确定为Mutation
    	 */
    	static class HBaseToHBaseByMRReducer extends TableReducer<Text, NullWritable, ImmutableBytesWritable> {
    
    		/**
    		 * baiyc_20150716_0001 base_info name baiyc1 1488348387443
    		 */
    		@Override
    		protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException {
    
    			String[] splits = key.toString().split("	");
    			String rowkeyStr = splits[0];
    			ImmutableBytesWritable rowkey = new ImmutableBytesWritable(Bytes.toBytes(rowkeyStr));
    
    			Put put = new Put(Bytes.toBytes(rowkeyStr));
    
    			String family = splits[1];
    			String qualifier = splits[2];
    			String value = splits[3];
    			String ts = splits[4];
    
    			put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Long.parseLong(ts), Bytes.toBytes(value));
    
    			context.write(rowkey, put);
    		}
    	}
    }
    

      



  • 相关阅读:
    Reverse linked list
    Implement Queue by Two Stacks
    Min Stack
    Search a 2D Matrix
    50. Pow(x, n)
    监控hdfs的一个目录,若有新文件,spark就开始处理这个文件,可以使用spark streaming textfilestream来监控该目录
    kafka2在重启消费者以后已经提交offset回退了 什么原因(待完善)
    Hybrid Recommender Systems: Survey and Experiments
    开源实时日志分析平台
    scala为什么要清理闭包
  • 原文地址:https://www.cnblogs.com/liuwei6/p/6855467.html
Copyright © 2011-2022 走看看