zoukankan      html  css  js  c++  java
  • HBase实验(CRUD和MR入库)

    前期准备

    • 启动zk
    • 启动HDFS
    • 启动HBase
    ./start-hbase.sh
    

    在HBase shell中实现CRUD操作

    1. 启动命令行客户端

    ./hbase shell
    

    2. 创建表

    • 指定命名空间school、表名student以及列族essentialadditional,其中列族essential有列namesex,列族additional有列interest

    ##创建命名空间
    create_namespace 'school'
    
    ##创建表 creat '命名空间:表名',{NAME=>列族名,VERSION=>保存版本数量}
    create 'school:student',{NAME => 'essential',VERSIONS => 3},{NAME => 'additional',VERSIONS => 3}
    
    
    
    • 默认命名空间、表名teacher以及列族essential

    ##不指定命名空间则使用默认命名空间
    create 'teacher',{NAME => 'essential'}
    
    

    3. 删除、新增列族

    • 在一个shell操作中删除表teacher的列族essential,并新增列族additional

    ##首先停用表teacher(新版本中不用)
    disable 'teacher'
    
    ##删除列族
    alter 'teacher',NAME => 'essential',METHOD => 'delete'
    
    ##或者
    alter 'teacher','delete' => 'essential'
    
    ##新增列族additional,不指定版本数则默认为3
    alter 'teacher', NAME => 'additional'
    
    ## 或者在新增列簇时指定版本数
    alter 'teacher', NAME => 'additional',VERSIONS => 220
    
    ##添加列族additiona同时删除列族essential
    alter 'teacher',{NAME => 'essential'},{NAME =>'essential',METHOD => 'delete'}
    
    ##启用表
    enable 'teacher'
    
    

    4. 删除表teacher

    ## 禁用表teacher
    disable 'teacher'
    
    ## 删除表teacher
    drop 'teacher'
    
    ##清空表数据
    truncate 'teacher'
    

    5. 新增数据

    • 向student表新增至少5条数据,其中某个cell至少拥有两个版本

    ## put 'table name','row','Column family:column name','new value'
    
    ##插入五条数据
    ##第一条
    put 'school:student','0001','essential:name','xiaoming'
    put 'school:student','0001','essential:sex','male'
    put 'school:student','0001','additional:interest','sing'
    
    ##第二条
    put 'school:student','0002','essential:name','xiaodong'
    put 'school:student','0002','essential:sex','male'
    put 'school:student','0002','additional:interest','ball'
    
    ##此后三条以此类推
    
    ##使第一条的additional:interest有两个版本
    put 'school:student','0001','additional:interest','swiming'
    
    ##查看additional列族中字段interest是两个版本的信息
    
    get 'school:student','0001',{COLUMN=>'additional:interest',VERSIONS=>2}
    
    
    

    6. 查看数据

    #查看所有数据
    scan 'school:student'
    
    #查看前10条数据
    scan 'student',{LIMIT=>10}
    
    #查看某一行的数据
    get 'student','0001',{COLUMN=>'additional',VERSIONS=>3000}
    

    用Java API实现CRUD操作

    工程结构

    1. 导入依赖包

    导入HBase安装包目录中lib目录的所有jar包

    2. 调用Java API

    package cn.bigdata.hbase;
    
    import java.util.ArrayList;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.util.Bytes;
    
    public class HbaseDAO {
    
    	public static void main(String[] args) throws Exception{
    		
    		switch (args[0]) {
            case "create":
                new HbaseDAO().createTable();
                break;
            case "put":
                new HbaseDAO().put();
                break;
            case "scan":
                new HbaseDAO().scan();
    
            default:
                System.out.println("first arg is " + args[0] + "enter true args");
            }
    				
    		
    	}
    	
    	//创建表
    	public void createTable() throws Exception{
    		
    		//设置配置文件
    		Configuration conf = HBaseConfiguration.create();
    		conf.set("hbase.zookeeper.quorum", "master:2181,slave1:2181,slave2:2181");
    		
    		//hbase客户端实例
    		HBaseAdmin admin = new HBaseAdmin(conf);
    		
    		//指定表名
    		TableName name = TableName.valueOf("teacher");
    				
    		//向表描述里添加表名
    		HTableDescriptor desc = new HTableDescriptor(name);
    				
    		//指定列族名和版本数
    		HColumnDescriptor essential = new HColumnDescriptor("essential");
    		HColumnDescriptor additional = new HColumnDescriptor("additional");
    		essential.setMaxVersions(3);
    		additional.setMaxVersions(3);
    		
    		//向表描述里添加列族
    		desc.addFamily(essential);
    		desc.addFamily(additional);
    		
    		//创建表
    		admin.createTable(desc);
    		
    		admin.close();
    	}
    	
    	//新增数据
    	public void put()throws Exception{
    		//设置配置文件
    		Configuration conf = HBaseConfiguration.create();
    		conf.set("hbase.zookeeper.quorum", "master:2181,slave1:2181,slave2:2181");
    		
    		//得到表实例
    		HTable teacher = new HTable(conf,"teacher");
    		
    		//一个Put对象需要用行键实例化,可以add多个键值对,包含行键,列族,列名,值,HBase中存储的数据类型为Byte
    		//第一条数据
            Put put1 = new Put(Bytes.toBytes("rk0001"));
            put1.add(Bytes.toBytes("essential"), Bytes.toBytes("name"), Bytes.toBytes("xiaoming"));
            put1.add(Bytes.toBytes("essential"), Bytes.toBytes("sex"), Bytes.toBytes("male"));
            put1.add(Bytes.toBytes("additional"), Bytes.toBytes("interest"), Bytes.toBytes("ball"));
            
            //第二条数据
            Put put2 = new Put(Bytes.toBytes("rk0002"));
            put2.add(Bytes.toBytes("essential"), Bytes.toBytes("name"), Bytes.toBytes("xiaodong"));
            put2.add(Bytes.toBytes("essential"), Bytes.toBytes("sex"), Bytes.toBytes("male"));
            put2.add(Bytes.toBytes("additional"), Bytes.toBytes("interest"), Bytes.toBytes("play"));
            
            //第三条数据
            Put put3 = new Put(Bytes.toBytes("rk0003"));
            put3.add(Bytes.toBytes("essential"), Bytes.toBytes("name"), Bytes.toBytes("xiaohong"));
            put3.add(Bytes.toBytes("essential"), Bytes.toBytes("sex"), Bytes.toBytes("female"));
            put3.add(Bytes.toBytes("additional"), Bytes.toBytes("interest"), Bytes.toBytes("study"));
            
            //第四条数据
            Put put4 = new Put(Bytes.toBytes("rk0004"));
            put4.add(Bytes.toBytes("essential"), Bytes.toBytes("name"), Bytes.toBytes("xiaohua"));
            put4.add(Bytes.toBytes("essential"), Bytes.toBytes("sex"), Bytes.toBytes("female"));
            put4.add(Bytes.toBytes("additional"), Bytes.toBytes("interest"), Bytes.toBytes("sing"));
            
            //第五条数据
            Put put5 = new Put(Bytes.toBytes("rk0005"));
            put5.add(Bytes.toBytes("essential"), Bytes.toBytes("name"), Bytes.toBytes("xiaolong"));
            put5.add(Bytes.toBytes("essential"), Bytes.toBytes("sex"), Bytes.toBytes("male"));
            put5.add(Bytes.toBytes("additional"), Bytes.toBytes("interest"), Bytes.toBytes("box"));
            
            //增加第一条的一个cell版本
            put1.add(Bytes.toBytes("additional"), Bytes.toBytes("interest"), Bytes.toBytes("swimming"));
            
            ArrayList<Put> puts = new ArrayList<>();
            puts.add(put1);
            puts.add(put2);
            puts.add(put3);
            puts.add(put4);
            puts.add(put5);
            
            teacher.put(puts);
            teacher.close();
    	}
    	
    	//扫描表
    	@SuppressWarnings("deprecation")
    	public void scan()throws Exception{
    		//设置配置文件
    		Configuration conf = HBaseConfiguration.create();
    		conf.set("hbase.zookeeper.quorum", "master:2181,slave1:2181,slave2:2181");
    				
    		//得到表实例
    		HTable teacher = new HTable(conf,"teacher");
    		
    		Scan scan = new Scan();
    		
    		//得到扫描结果
    		ResultScanner rs = teacher.getScanner(scan);
    		
    		//在控制台显示
    		for(Result r : rs){
    			for(KeyValue kValue : r.list()) {
    				
    				System.out.print("Row Name:" + new String(kValue.getRow()) + "   ");
    				
    				System.out.print("Column Family:" + new String(kValue.getFamily())+ "   ");
    				
    				System.out.print("Key:" + new String(kValue.getQualifier())+ "   ");
    				
    				System.out.println("Value:" + new String(kValue.getValue()));				
    			}
            }
            rs.close();
            teacher.close();
    		
    	}
                	
    }
    
    
    

    3. 导出hbasedemo.jar包

    4. 将HBase依赖包加入到hadoop classpath

    在hadoop安装目录下找到hadoop-env.sh文件,添加 :

    export HADOOP_CLASSPATH=/home/hadoop/apps/hbase/lib/*
    
    

    5. 运行

    #创建表
    hadoop jar hbasedemo.jar cn.bigdata.hbase.HbaseDAO create
    
    #插入数据
    hadoop jar hbasedemo.jar cn.bigdata.hbase.HbaseDAO put
    
    #扫描表
    hadoop jar hbasedemo.jar cn.bigdata.hbase.HbaseDAO scan
    

    注:也可直接在eclipse中运行(跳过3、5步骤),因为要访问zookeeper,所以要修改eclipse所在主机的hosts文件,确保可以ping通zookeeper的主机。

    将MapReduce结果直接存储在HBase

    1. 导入MapReduce和HBase依赖包

    2. 代码实现

    package cn.mr2hbase;
    import java.io.IOException;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    public class MRtoHBase {
    
    	public static class Map extends Mapper<LongWritable, Text, Text, LongWritable>{
    		
    		
    		//该步骤同一般Map程序
    		@Override
    		protected void map(LongWritable key, Text value, Context context)
    				throws IOException, InterruptedException {
    			//将读入行转化为字符串
    			String line = value.toString();
    			//切分字符串
    			String[] words = StringUtils.split(line," ");
    			//将单词写入context
    			for(String word:words) {
    				context.write(new Text(word), new LongWritable(1));
    			}
    		}
    		
    	}
    	
    	public static class Reduce extends TableReducer<Text, LongWritable, NullWritable>{
    
    		@Override
    		protected void reduce(Text key, Iterable<LongWritable> values,Context context)
    				throws IOException, InterruptedException {
    			long count =0;
    			for(LongWritable value : values) {				
    				count += value.get();
    			}
    			//实例化Put,将单词作为主键
    			Put put = new Put(Bytes.toBytes(key.toString()));
    			//列族为content,key为result,value为count
    			put.add(Bytes.toBytes("content"), Bytes.toBytes("result"), Bytes.toBytes(String.valueOf(count)));
    			context.write(NullWritable.get(), put);
    		}
    		
    	}
    	
    	//创建表
    		public static void createTable(String tablename) throws Exception{
    			
    			//设置配置文件
    			Configuration conf = HBaseConfiguration.create();
    			conf.set("hbase.zookeeper.quorum", "master:2181,slave1:2181,slave2:2181");
    			
    			//hbase客户端实例
    			HBaseAdmin admin = new HBaseAdmin(conf);
    			
    			//指定表名
    			TableName name = TableName.valueOf(tablename);
    					
    			//向表描述里添加表名
    			HTableDescriptor desc = new HTableDescriptor(name);
    					
    			//指定列族名和版本数
    			HColumnDescriptor content = new HColumnDescriptor("content");
    			content.setMaxVersions(3);
    			
    			//向表描述里添加列族
    			desc.addFamily(content);
    			//判断表是否存在
    			if(admin.tableExists(tablename)){  
    	            System.out.println("table exists,trying recreate table !");  
    	            admin.disableTable(tablename);  
    	            admin.deleteTable(tablename);  
    	        }  
    	        System.out.println("Create new table: "+ tablename);
    			
    			//创建表
    			admin.createTable(desc);
    			
    			admin.close();
    		}
    
    	public static void main(String[] args) throws Exception{
    		
    		String tablename = "WC";
    		//创建表
    		createTable(tablename);
    		
    		//配置文件
    		Configuration conf = new Configuration();
    		conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);
    		Job job = Job.getInstance(conf);
    		
    		//设置整个job所调用类的jar包路径
    		job.setJarByClass(MRtoHBase.class);
    		
    		//设置该作业所使用的mapper和reducer类
    		job.setMapperClass(Map.class);
    		job.setReducerClass(Reduce.class);
    		        
    		//指定mapper输出数据的k-v类型,和reduce输出类型一样,可缺省
    		job.setMapOutputKeyClass(Text.class);
    		job.setOutputValueClass(LongWritable.class);		
    				
    		//指定reduce输出到Hbase
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(LongWritable.class);		
    				
    		//指定输入数据存放路径
    		FileInputFormat.setInputPaths(job, new Path(args[0]));		
    				
    		//指定输出到HBase
    		job.setOutputFormatClass(TableOutputFormat.class);		
    		
    		//将job提交给集群运行,参数为true表示提示运行进度
    		System.exit(job.waitForCompletion(true)?0:1);
    
    	}
    
    }
    
    
    

    3. 导出hbasedemo.jar包

    4.修改Yarn配置文件

    修改集群中所有主机Hadoop配置目录下的Yarn-site.xml配置文件,添加以下属性,将Hbaselib目录下的依赖包路径配置到Yarn的class path

    <property>
        <name>mapreduce.application.classpath</name>
            <value>
                /home/hadoop/app/hbase-0.96.2-hadoop2/lib/*
        </value>
    </property>
    

    5. 运行

    #在集群主机上运行
    hadoop jar mrtohbase.jar cn.mr2hbase.MRtoHBase /wc/srcdata
    
  • 相关阅读:
    mysql 基础sql语句
    mysql存储引擎概述
    docker命令总结
    python链接postgresql
    Log4.net示例
    postgresql 使用游标笔记
    npm常用命令
    Nginx命令
    Ubuntu命令总结
    NHibernate总结
  • 原文地址:https://www.cnblogs.com/liminghuang/p/9075244.html
Copyright © 2011-2022 走看看