zoukankan      html  css  js  c++  java
  • Hbase(七)hbase高级编程

    一、Hbase结合mapreduce    

         为什么需要用 mapreduce 去访问 hbase 的数据?
         ——加快分析速度和扩展分析能力
         Mapreduce 访问 hbase 数据作分析一定是在离线分析的场景下应用

           

          1、HbaseToHDFS

             从 hbase 中读取数据,分析之后然后写入 hdfs,代码实现:

    package com.ghgj.hbase.hbase2hdfsmr;
    
    import java.io.IOException;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * 作用:从hbase中读取user_info这个表的数据,然后写出到hdfs
     */
    public class HBaseToHDFSMR {
    	
    	private static final String ZK_CONNECT = "hadoop03:2181,hadoop04:2181,hadoop05:2181";
    
    	public static void main(String[] args) throws Exception {
    		
    		Configuration conf = HBaseConfiguration.create();
    		conf.set("hbase.zookeeper.quorum", ZK_CONNECT);
    		System.setProperty("HADOOP_USER_NAME", "hadoop");
    //		conf.set("fs.defaultFS", "hdfs://myha01/");
    		
    		Job job = Job.getInstance(conf);
    		job.setJarByClass(HBaseToHDFSMR.class);
    		
    		Scan scan = new Scan();
    		scan.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("name"));
    		/**
    		 * TableMapReduceUtil:以util结尾:工具
    		 * MapReduceFactory:以factory结尾,它是工厂类,最大作用就是管理对象的生成
    		 */
    		TableMapReduceUtil.initTableMapperJob("user_info", scan, 
    				HBaseToHDFSMRMapper.class, Text.class, NullWritable.class, job);
    		job.setReducerClass(HBaseToHDFSMRReducer.class);
    		
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(NullWritable.class);
    		
    		Path outputPath = new Path("/hbase2hdfs/output");
    		FileSystem fs = FileSystem.get(conf);
    		if(fs.exists(outputPath)){
    			fs.delete(outputPath);
    		}
    		FileOutputFormat.setOutputPath(job, outputPath);
    		
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.exit(waitForCompletion ? 0 : 1);
    	}
    	
    	static class HBaseToHDFSMRMapper extends TableMapper<Text, NullWritable>{
    		/**
    		 * key:rowkey
    		 * value:map方法每执行一次接收到的一个参数,这个参数就是一个Result实例
    		 * 这个Result里面存的东西就是rowkey, family, qualifier, value, timestamp
    		 */
    		@Override
    		protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, NullWritable>.Context context) throws IOException, InterruptedException {
    			String rowkey = Bytes.toString(key.copyBytes());
    			System.out.println(rowkey);
    			List<Cell> cells = value.listCells();
    			for (int i = 0; i < cells.size(); i++) {
    				Cell cell = cells.get(i);
    				String rowkey_result = Bytes.toString(cell.getRow()) + "	"
    						+ Bytes.toString(cell.getFamily()) + "	"
    						+ Bytes.toString(cell.getQualifier()) + "	"
    						+ Bytes.toString(cell.getValue()) + "	"
    						+ cell.getTimestamp();
    				context.write(new Text(rowkey_result), NullWritable.get());
    			}
    		}
    	}
    	
    	static class HBaseToHDFSMRReducer extends Reducer<Text, NullWritable, Text, NullWritable>{
    		@Override
    		protected void reduce(Text key, Iterable<NullWritable> arg1, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
    			context.write(key, NullWritable.get());
    		}
    	}
    }
    

      2、HDFSToHbase

            从 hdfs 从读入数据,处理之后写入 hbase,代码实现:

    package com.ghgj.hbase.hbase2hdfsmr;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.Mutation;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    public class HDFSToHBaseMR {
    	private static final String ZK_CONNECT = "hadoop03:2181,hadoop04:2181,hadoop05:2181";
    	private static final String TABLE_NAME = "person_info";
    
    	public static void main(String[] args) throws Exception {
    
    		Configuration conf = HBaseConfiguration.create();
    		conf.set("hbase.zookeeper.quorum", ZK_CONNECT);
    		System.setProperty("HADOOP_USER_NAME", "hadoop");
    		Job job = Job.getInstance(conf);
    		job.setJarByClass(HDFSToHBaseMR.class);
    
    		// 以下这一段代码是为了创建一张hbase表叫做 person_info
    		HBaseAdmin admin = new HBaseAdmin(conf);
    		HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
    		htd.addFamily(new HColumnDescriptor("base_info"));
    		if (admin.tableExists(TABLE_NAME)) {
    			admin.disableTable(TABLE_NAME);
    			admin.deleteTable(TABLE_NAME);
    		}
    		admin.createTable(htd);
    
    		// 给job指定mapperclass 和  reducerclass
    		job.setMapperClass(HDFSToHBaseMRMapper.class);
    		TableMapReduceUtil.initTableReducerJob(TABLE_NAME, HDFSToHBaseMRReducer.class, job);
    		
    		// 给mapper和reducer指定输出的key-value的类型
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(NullWritable.class);
    		job.setOutputKeyClass(ImmutableBytesWritable.class);
    		job.setOutputValueClass(Mutation.class);
    
    		// 指定输入数据的路径
    		FileInputFormat.setInputPaths(job, new Path("/hbase2hdfs/output"));
    		
    		// job提交
    		boolean boo = job.waitForCompletion(true);
    		System.exit(boo ? 0 :1);
    	}
    
    	static class HDFSToHBaseMRMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    		@Override
    		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
    			context.write(value, NullWritable.get());
    		}
    	}
    
    	/**
    	 * TableReducer extends Reducer 这么做的唯一效果就是把valueout的类型确定为Mutation
    	 */
    	static class HDFSToHBaseMRReducer extends TableReducer<Text, NullWritable, ImmutableBytesWritable> {
    
    		/**
    		 * baiyc_20150716_0001 base_info name baiyc1 1488348387443
    		 */
    		@Override
    		protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException {
    
    			String[] splits = key.toString().split("	");
    			String rowkeyStr = splits[0];
    			ImmutableBytesWritable rowkey = new ImmutableBytesWritable(Bytes.toBytes(rowkeyStr));
    
    			Put put = new Put(Bytes.toBytes(rowkeyStr));
    
    			String family = splits[1];
    			String qualifier = splits[2];
    			String value = splits[3];
    			String ts = splits[4];
    
    			put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Long.parseLong(ts), Bytes.toBytes(value));
    
    			context.write(rowkey, put);
    		}
    	}
    
    }

    二、Hbase和mysql数据库数据进行互导

          1、mysql数据导入到hbase(用sqoop)

      命令:

    sqoop import --connect jdbc:mysql://hadoop01/mytest --username root --password root
    --table student --hbase-create-table --hbase-table studenttest --column-family name
    --hbase-row-key id

    其 中 会 报 错 , 说 Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.HTableDescriptor.addFamily(Lorg/apache/hadoop/hbase/HColumnDescriptor;)V 是由于版本不兼容引起,我们可以通过事先创建好表就可以使用了。
    请使用下面的命令:

    sqoop import --connect jdbc:mysql://hadoop01/mytest --username root --password root
    --table student --hbase-table studenttest1 --column-family name --hbase-row-key id

    --hbase-create-table 自动在 hbase 中创建表
    --column-family name 指定列簇名字
    --hbase-row-key id 指定 rowkey 对应的 mysql 当中的键

        2、hbase数据导入到mysql

    目前没有直接的命令将 Hbase 中的数据导出到 mysql,但是可以先将 hbase 中的数据导 出到 hdfs 中,再将数据导出 mysql

    替代方案:
    先将 hbase 的数据导入到 hdfs 或者 hive,然后再将数据导入到 mysql

    三、hbase整合hive

         原理:

    Hive 与 HBase 利用两者本身对外的 API 来实现整合,主要是靠 HBaseStorageHandler 进 行通信,利用 HBaseStorageHandler, Hive 可以获取到 Hive 表对应的 HBase 表名,列簇以及 列, InputFormat 和 OutputFormat 类,创建和删除 HBase 表等。

    Hive 访问 HBase 中表数据,实质上是通过 MapReduce 读取 HBase 表数据,其实现是在 MR 中,使用 HiveHBaseTableInputFormat 完成对 HBase 表的切分,获取 RecordReader 对象来读 取数据。

    对 HBase 表的切分原则是一个 Region 切分成一个 Split,即表中有多少个 Regions,MR 中就有多 少个 Map。

    读取 HBase 表数据都是通过构建 Scanner,对表进行全表扫描,如果有过滤条件,则转化为 Filter。当过滤条件为 rowkey 时,则转化为对 rowkey 的过滤, Scanner 通过 RPC 调用  RegionServer 的 next()来获取数据;

     1、准备hbase表 数据

    create 'mingxing',{NAME => 'base_info',VERSIONS => 1},{NAME => 'extra_info',VERSIONS => 1}

    插入数据:

    put 'mingxing','rk001','base_info:name','huangbo'
    put 'mingxing','rk001','base_info:age','33'
    put 'mingxing','rk001','extra_info:math','44'
    put 'mingxing','rk001','extra_info:province','beijing'
    put 'mingxing','rk002','base_info:name','xuzheng'
    put 'mingxing','rk002','base_info:age','44'
    put 'mingxing','rk003','base_info:name','wangbaoqiang'
    put 'mingxing','rk003','base_info:age','55'
    put 'mingxing','rk003','base_info:gender','male'
    put 'mingxing','rk004','extra_info:math','33'
    put 'mingxing','rk004','extra_info:province','tianjin'
    put 'mingxing','rk004','extra_info:children','3'
    put 'mingxing','rk005','base_info:name','liutao'
    put 'mingxing','rk006','extra_info:name','liujialing'

       2、hive端操作

    三、hbasetohbase   byMR

    package com.ghgj.hbase.hbase2hdfsmr;
    
    import java.io.IOException;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.Mutation;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class HBaseToHBaseByMR {
    
    	private static final String ZK_CONNECT = "hadoop03:2181,hadoop04:2181,hadoop05:2181";
    	private static final String OLD_TABLE_NAME = "user_info";
    	private static final String NEW_TABLE_NAME = "person_info2";
    	private static final String FAMILY = "base_info";
    	private static final String QUALIFIER = "age";
    
    	public static void main(String[] args) throws Exception {
    
    		Configuration conf = HBaseConfiguration.create();
    		conf.set("hbase.zookeeper.quorum", ZK_CONNECT);
    		System.setProperty("HADOOP_USER_NAME", "hadoop");
    		// conf.set("fs.defaultFS", "hdfs://myha01/");
    
    		Job job = Job.getInstance(conf);
    		job.setJarByClass(HBaseToHDFSMR.class);
    
    		// 以下这一段代码是为了创建一张hbase表叫做 person_info
    		HBaseAdmin admin = new HBaseAdmin(conf);
    		HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(NEW_TABLE_NAME));
    		htd.addFamily(new HColumnDescriptor(FAMILY));
    		if (admin.tableExists(NEW_TABLE_NAME)) {
    			admin.disableTable(NEW_TABLE_NAME);
    			admin.deleteTable(NEW_TABLE_NAME);
    		}
    		admin.createTable(htd);
    
    		Scan scan = new Scan();
    		scan.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(QUALIFIER));
    		/**
    		 * TableMapReduceUtil:以util结尾:工具
    		 * MapReduceFactory:以factory结尾,它是工厂类,最大作用就是管理对象的生成
    		 */
    		TableMapReduceUtil.initTableMapperJob(OLD_TABLE_NAME, scan, HBaseToHBaseByMRMapper.class, Text.class, NullWritable.class, job);
    		TableMapReduceUtil.initTableReducerJob(NEW_TABLE_NAME, HBaseToHBaseByMRReducer.class, job);
    
    		// 给mapper和reducer指定输出的key-value的类型
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(NullWritable.class);
    		job.setOutputKeyClass(ImmutableBytesWritable.class);
    		job.setOutputValueClass(Mutation.class);
    
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.exit(waitForCompletion ? 0 : 1);
    	}
    
    	static class HBaseToHBaseByMRMapper extends TableMapper<Text, NullWritable> {
    		/**
    		 * key:rowkey value:map方法每执行一次接收到的一个参数,这个参数就是一个Result实例
    		 * 这个Result里面存的东西就是rowkey, family, qualifier, value, timestamp
    		 */
    		@Override
    		protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, NullWritable>.Context context) throws IOException, InterruptedException {
    			String rowkey = Bytes.toString(key.copyBytes());
    			System.out.println(rowkey);
    			List<Cell> cells = value.listCells();
    			for (int i = 0; i < cells.size(); i++) {
    				Cell cell = cells.get(i);
    				String rowkey_result = Bytes.toString(cell.getRow()) + "	" + Bytes.toString(cell.getFamily()) + "	" + Bytes.toString(cell.getQualifier()) + "	" + Bytes.toString(cell.getValue()) + "	" + cell.getTimestamp();
    				context.write(new Text(rowkey_result), NullWritable.get());
    			}
    		}
    	}
    
    	/**
    	 * TableReducer extends Reducer 这么做的唯一效果就是把valueout的类型确定为Mutation
    	 */
    	static class HBaseToHBaseByMRReducer extends TableReducer<Text, NullWritable, ImmutableBytesWritable> {
    
    		/**
    		 * baiyc_20150716_0001 base_info name baiyc1 1488348387443
    		 */
    		@Override
    		protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException {
    
    			String[] splits = key.toString().split("	");
    			String rowkeyStr = splits[0];
    			ImmutableBytesWritable rowkey = new ImmutableBytesWritable(Bytes.toBytes(rowkeyStr));
    
    			Put put = new Put(Bytes.toBytes(rowkeyStr));
    
    			String family = splits[1];
    			String qualifier = splits[2];
    			String value = splits[3];
    			String ts = splits[4];
    
    			put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Long.parseLong(ts), Bytes.toBytes(value));
    
    			context.write(rowkey, put);
    		}
    	}
    }
    

      



  • 相关阅读:
    Calling a parent window function from an iframe
    JSON with Java
    Posting array of JSON objects to MVC3 action method via jQuery ajax
    What's the difference between jquery.js and jquery.min.js?
    jquery loop on Json data using $.each
    jquery ui tabs详解(中文)
    DataTables warning requested unknown parameter
    Datatables 1.10.x在命名上与1.9.x
    jQuery 1.x and 2.x , which is better?
    DataTabless Add rows
  • 原文地址:https://www.cnblogs.com/liuwei6/p/6855467.html
Copyright © 2011-2022 走看看