第一章 HBase 快速入门
1.1 HBase 安装部署
1.1.1 Zookeeper 正常部署
首先保证 Zookeeper 集群的正常部署,并启动之:
https://www.cnblogs.com/wkfvawl/p/15539847.html#scroller-16
1.1.2 Hadoop 正常部署
Hadoop 集群的正常部署并启动:
https://www.cnblogs.com/wkfvawl/p/15369416.html#scroller-52
1.1.3 HBase 的解压
解压 Hbase 到指定目录:
[atguigu@hadoop102 software]$ tar -zxvf hbase-1.3.1-bin.tar.gz -C /opt/module
1.1.4 HBase 的配置文件
修改 HBase 对应的配置文件。
1)hbase-env.sh 修改内容:
export JAVA_HOME=/opt/module/jdk1.8.0_212
export HBASE_MANAGES_ZK=false
注释掉:
2)hbase-site.xml 修改内容:
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop102:8020/HBase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<!-- 0.98 后的新变动,之前版本没有.port,默认端口为 60000 -->
<property>
<name>hbase.master.port</name>
<value>16000</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop102,hadoop103,hadoop104</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/opt/module/zookeeper-3.5.7/zkData</value>
</property>
</configuration>
3)regionservers:
hadoop102
hadoop103
hadoop104
4)软连接 hadoop 配置文件到 HBase:
ln -s /opt/module/hadoop-3.1.3/etc/hadoop/core-site.xml /opt/module/hbase/conf/core-site.xml
ln -s /opt/module/hadoop-3.1.3/etc/hadoop/hdfs-site.xml /opt/module/hbase/conf/hdfs-site.xml
1.1.5 HBase 远程发送到其他集群
[atguigu@hadoop102 module]$ xsync hbase/
1.1.6 HBase 服务的启动
1.启动方式
[atguigu@hadoop102 hbase]$ bin/hbase-daemon.sh start master
[atguigu@hadoop102 hbase]$ bin/hbase-daemon.sh start regionserver
提示:如果集群之间的节点时间不同步,会导致 regionserver 无法启动,抛出ClockOutOfSyncException 异常。
修复提示:
a、同步时间服务
https://www.cnblogs.com/wkfvawl/p/15369416.html#scroller-54
b、属性:hbase.master.maxclockskew 设置更大的值
<property>
<name>hbase.master.maxclockskew</name>
<value>180000</value>
<description>Time difference of regionserver from
master</description>
</property>
2.启动方式 2
[atguigu@hadoop102 hbase]$ bin/start-hbase.sh
对应的停止服务:
[atguigu@hadoop102 hbase]$ bin/stop-hbase.sh
1.1.7 查看 HBase 页面
HBase的端口
HBase Master | 16000 | hbase-client-1.x.x.jar | RegionServer接入 | |
16010 | HTTP | http://namenode1:16010/ | 集群监控 | |
HBase RegionServer | 16020 | N/A | 客户端接入 | |
16030 | HTTP | http://datanode1:16030/ | 节点监控 |
启动成功后,可以通过“host:port”的方式来访问 HBase 管理页面,例如:
http://hadoop102:16010
1.2 HBase Shell 操作
1.2.1 基本操作
1.进入 HBase 客户端命令行
[atguigu@hadoop102 hbase]$ bin/hbase shell
2.查看帮助命令
hbase(main):001:0> help
3.查看当前数据库中有哪些表
hbase(main):002:0> list
1.2.2 表的操作
1.创建表
hbase(main):002:0> create 'student','info'
2.插入数据到表
hbase(main):003:0> put 'student','1001','info:sex','male'
hbase(main):004:0> put 'student','1001','info:age','18'
hbase(main):005:0> put 'student','1002','info:name','Janna'
hbase(main):006:0> put 'student','1002','info:sex','female'
hbase(main):007:0> put 'student','1002','info:age','20'
3.扫描查看表数据
hbase(main):008:0> scan 'student'
hbase(main):009:0> scan 'student',{STARTROW => '1001', STOPROW =>'1001'}
hbase(main):010:0> scan 'student',{STARTROW => '1001'}
4.查看表结构
hbase(main):011:0> describe ‘student’
5.更新指定字段的数据
hbase(main):012:0> put 'student','1001','info:name','Nick'
hbase(main):013:0> put 'student','1001','info:age','100'
6.查看“指定行”或“指定列族:列”的数据
hbase(main):014:0> get 'student','1001'
hbase(main):015:0> get 'student','1001','info:name'
7.统计表数据行数
hbase(main):021:0> count 'student'
8.删除数据
删除某 rowkey 的全部数据:
hbase(main):016:0> deleteall 'student','1001'
删除某 rowkey 的某一列数据:
hbase(main):017:0> delete 'student','1002','info:sex'
9.清空表数据
hbase(main):018:0> truncate 'student'
Truncating 'student' table (it may take a while):
- Disabling table...
- Truncating table...
0 row(s) in 3.8140 seconds
提示:清空表的操作顺序为先 disable,然后再 truncate。
10.删除表
首先需要先让该表为 disable 状态:
hbase(main):019:0> disable 'student'
然后才能 drop 这个表:
hbase(main):020:0> drop 'student'
提示:如果直接 drop 表,会报错:ERROR: Table student is enabled. Disable it first.
11.变更表信息 将 info 列族中的数据存放 3 个版本:
hbase(main):022:0> alter 'student',{NAME=>'info',VERSIONS=>3}
hbase(main):022:0> get 'student','1001',{COLUMN=>'info:name',VERSIONS=>3}
第二章 HBase API 操作
2.1 环境准备
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.3.1</version> </dependency>
zookeeper和hadoop的依赖,与集群上的版本号对应起来。
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.7</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.1.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.1.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> <version>3.1.3</version> </dependency>
2.2 API
获取 Configuration 对象
private static Configuration conf; private static Admin admin = null; private static Connection connection = null; // 获取 Configuration 对象 static{ try{ //使用 HBaseConfiguration 的单例方法实例化 conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "hadoop102"); conf.set("hbase.zookeeper.property.clientPort", "2181"); connection = ConnectionFactory.createConnection(conf); // 获取管理员对象 admin = connection.getAdmin(); } catch (IOException e){ e.printStackTrace(); } }
判断表是否存在
// 判断表是否存在 public static boolean isTableExist(String tableName) throws IOException { //在 HBase 中管理、访问表需要先创建 HBaseAdmin 对象 System.out.println("method isTableExist"); return admin.tableExists(TableName.valueOf(tableName)); }
创建表
// 创建表 public static void createTable(String tableName, String... columnFamily) throws IOException{ //判断表是否存在 if(isTableExist(tableName)){ System.out.println("表" + tableName + "已存在"); //System.exit(0); }else{ // 创建表属性对象,表名需要转字节 HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName)); //创建多个列族 for(String cf : columnFamily){ //创建列族描述器new HColumnDescriptor(cf) descriptor.addFamily(new HColumnDescriptor(cf)); } //根据对表的配置,创建表 admin.createTable(descriptor); System.out.println("表" + tableName + "创建成功!"); } }
删除表
// 删除表 public static void dropTable(String tableName) throws IOException{ if(isTableExist(tableName)){ admin.disableTable(TableName.valueOf(tableName)); admin.deleteTable(TableName.valueOf(tableName)); System.out.println("表" + tableName + "删除成功!"); }else{ System.out.println("表" + tableName + "不存在!"); } }
向表中插入数据
// 向表中插入数据 public static void addRowData(String tableName, String rowKey, String columnFamily, String column, String value) throws IOException{ //创建 HTable 对象 Table hTable = connection.getTable(TableName.valueOf(tableName)); //向表中插入数据 Put put = new Put(Bytes.toBytes(rowKey)); //向 Put 对象中组装数据 put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)); hTable.put(put); hTable.close(); System.out.println("插入数据成功"); }
获取所有数据
// 获取所有数据 public static void getAllRows(String tableName) throws IOException{ Table hTable = connection.getTable(TableName.valueOf(tableName)); //得到用于扫描 region 的对象 Scan scan = new Scan(); //使用 HTable 得到 resultcanner 实现类的对象 ResultScanner resultScanner = hTable.getScanner(scan); for(Result result : resultScanner){ Cell[] cells = result.rawCells(); for(Cell cell : cells){ //得到 rowkey System.out.println(" 行 键 :" + Bytes.toString(CellUtil.cloneRow(cell))); //得到列族 System.out.println(" 列 族 " + Bytes.toString(CellUtil.cloneFamily(cell))); System.out.println(" 列 :" + Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println(" 值 :" + Bytes.toString(CellUtil.cloneValue(cell))); } } }
获取某一行数据
// 获取某一行数据 public static void getRow(String tableName, String rowKey) throws IOException{ Table hTable = connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); //get.setMaxVersions();显示所有版本 //get.setTimeStamp();显示指定时间戳的版本 Result result = hTable.get(get); for(Cell cell : result.rawCells()){ System.out.println(" 行 键 :" +Bytes.toString(result.getRow())); System.out.println(" 列 族 " + Bytes.toString(CellUtil.cloneFamily(cell))); System.out.println(" 列 :" + Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println(" 值 :" + Bytes.toString(CellUtil.cloneValue(cell))); System.out.println("时间戳:" + cell.getTimestamp()); } }
获取某一行指定“列族:列”的数据
// 获取某一行指定“列族:列”的数据 public static void getRowQualifier(String tableName, String rowKey, String family, String qualifier) throws IOException{ Table hTable = connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); Result result = hTable.get(get); for(Cell cell : result.rawCells()){ System.out.println(" 行 键 :" + Bytes.toString(result.getRow())); System.out.println(" 列 族 " + Bytes.toString(CellUtil.cloneFamily(cell))); System.out.println(" 列 :" + Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println(" 值 :" + Bytes.toString(CellUtil.cloneValue(cell))); } }
删除指定行
public static void deleteData(String tableName, String rowKey, String columnFamily, String column ) throws IOException{ //创建 HTable 对象 Table table = connection.getTable(TableName.valueOf(tableName)); //构建删除对象 Delete delete = new Delete(Bytes.toBytes(rowKey)); delete.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column)); //执行删除操作 table.delete(delete); //关闭链接 table.close(); }
addColumn这个API要慎用
- addColumn() 只删除等于给定时间戳的版本。
hbase存储数据是按照版本存储的,并且是按照时间戳决定的版本,只有时间戳最新的版本才有效。
比如原有三个版本,都是表示数据类型的,时间戳假设是1,5,8三个。
而addColumn指定了时间戳是5,那么hbas就会新增一个时间戳为5的delete版本,
然后一共有4个版本,1,5,5(delete),8..
那么5(delete)表示删除的5这个版本。
当然,由于最新版的还是8,所以get和scan查询出来的是8这个版本的数据。
如果没有8这个版本,即添加了5(delete)之后的有1,5,5(delete)三个版本,那么5(delete)还是表示删除5这个版本,然后get或者scan查询的数据将会是1这个版本的数据value。
- addColumns() 删除给定列的所有小于等于给定的时间戳版本的数据。即添加一个版本,类型为deleteColumn。
get或者scan时,小于该deleteColumn数据版本的所有数据都是不可见的
删除多行数据
// 删除多行数据
public static void deleteMultiRow(String tableName, String... rows) throws IOException{
Table hTable = connection.getTable(TableName.valueOf(tableName));
List<Delete> deleteList = new ArrayList<Delete>();
for(String row : rows){
Delete delete = new Delete(Bytes.toBytes(row));
deleteList.add(delete);
}
hTable.delete(deleteList);
hTable.close();
}
2.3 MapReduce
在Hadoop中MR使用HBase,需要将HBase的jar包添加到Hadoop的类路径下,所以需要修改配置文件添加类路径。这源于一个思想:
A要使用 B,那么A要有B的jar包。例如:在 Hive的安装中,Hive需要使用到MySQL数据库,所以将jdbc驱动包放到lib文件夹中
通过 HBase 的相关 JavaAPI,我们可以实现伴随 HBase 操作的 MapReduce 过程,比如使用MapReduce 将数据从本地文件系统导入到 HBase 的表中,比如我们从 HBase 中读取一些原始数据后使用 MapReduce 做数据分析。
官方 HBase-MapReduce
1.查看 HBase 的 MapReduce 任务的执行
$ bin/hbase mapredcp
2.环境变量的导入
(1)执行环境变量的导入(临时生效,在命令行执行下述操作)
$ export HBASE_HOME=/opt/module/hbase $ export HADOOP_HOME=/opt/module/hadoop-3.1.3 $ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
(2)永久生效:在/etc/profile 配置
export HBASE_HOME=/opt/module/hbase
export HADOOP_HOME=/opt/module/hadoop-3.1.3
并在 hadoop-env.sh 中配置:
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*
3.运行官方的 MapReduce 任务
案例一:统计 Student 表中有多少行数据
/opt/module/hadoop-3.1.3/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter stu
案例二:使用 MapReduce 将本地数据导入到 HBase
1)在本地创建一个 tsv 格式的文件:fruit.tsv
1001 Apple Red 1002 Pear Yellow 1003 Pineapple Yellow
注意这里要用tab来分隔 否则会出错误
2)创建 Hbase 表
Hbase(main):001:0> create 'fruit','info'
3)在 HDFS 中上传 fruit.tsv 文件到根目录下
hadoop fs -put fruit.tsv /
在hdfs上查看http://hadoop102:9870/explorer.html#/
4)执行 MapReduce 到 HBase 的 fruit 表中
$ /opt/module/hadoop-3.1.3/bin/yarn jar lib/hbase-server-1.3.1.jar \
importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \
hdfs://hadoop102:8020/fruit.tsv
这里加 \ 将命令变成多行 可以方便阅读。
5)使用 scan 命令查看导入后的结果
Hbase(main):001:0> scan ‘fruit’
自定义 HBase-MapReduce1
目标:将 fruit 表中的一部分数据,通过 MR 迁入到 fruit_mr 表中。
分步实现:
1.构建 ReadFruitMapper 类,用于读取 fruit 表中的数据
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FruitMapper extends Mapper<LongWritable, Text, LongWritable, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(key, value); } }
2. 构建FruitReducer类,用于将读取到的数据写入到hbase中的fruit1表中
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import java.io.IOException; public class FruitReducer extends TableReducer<LongWritable, Text, NullWritable> { @Override protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 1.遍历values:1001 Apple Red for (Text value : values) { // 2.获取每一行数据 String[] fields = value.toString().split("\t"); // 3.构建Put对象 Put put = new Put(Bytes.toBytes(fields[0])); // 4.给Put对象赋值 put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(fields[1])); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(fields[2])); // 5.写出 context.write(NullWritable.get(), put); } } }
3. 构建FruitDriver implements Tool用于组装运行Job任务
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FruitDriver implements Tool { // 定义一个Configuration private Configuration configuration = null; public int run(String[] args) throws Exception { // 1.获取Job对象 Job job = Job.getInstance(configuration); // 2.设置驱动类路径 job.setJarByClass(FruitDriver.class); // 3.设置Mapper和Mapper输出的KV类型 job.setMapperClass(FruitMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); // 4.设置Reducer类 TableMapReduceUtil.initTableReducerJob(args[1], FruitReducer.class, job); // 5.设置输入输出参数 FileInputFormat.setInputPaths(job, new Path(args[0])); // 6.提交任务 boolean result = job.waitForCompletion(true); return result ? 0 : 1; } public void setConf(Configuration conf) { configuration = conf; } public Configuration getConf() { return configuration; } public static void main(String[] args) { try { Configuration configuration = new Configuration(); int run = ToolRunner.run(configuration, new FruitDriver(), args); System.exit(run); } catch (Exception e) { e.printStackTrace(); } } }
4. 打包运行任务
/opt/module/hadoop-3.1.3/bin/yarn jar HBaseExample-1.0-SNAPSHOT.jar com.stitch.hbase.mr1.FruitDriver /fruit.tsv fruit1
提示:运行任务前,如果待数据导入的表不存在,则需要提前创建。
提示:maven打包命令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)
自定义 HBase-MapReduce2
目标:将HBase中fruit1表中的一部分数据(name列),通过MR迁入到HBase的fruit2表中。
1. 构建Fruit2Mapper类,用于读取fruit1表中的数据
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; public class Fruit2Mapper extends TableMapper<ImmutableBytesWritable, Put> { @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { // 构建Put对象 Put put = new Put(key.get()); // 1.获取数据 for (Cell cell : value.rawCells()) { // 2.判断当前的cell是否为"name"列 if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) { // 3.给Put对象赋值 put.add(cell); } } // 4.写出 context.write(key, put); } }
2.构建Fruit2Reducer类,用于将读取到的fruit1表中的数据写入到fruit2表中
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable; import java.io.IOException; public class Fruit2Reducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> { @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { // 遍历写出 for (Put put : values) { context.write(NullWritable.get(), put); } } }
3.构建Fruit2Driver implements Tool用于组装运行Job任务
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Fruit2Driver implements Tool { // 定义配置信息 private Configuration configuration = null; public int run(String[] args) throws Exception { // 1.获取Job对象 Job job = Job.getInstance(configuration); // 2.设置主类路径 job.setJarByClass(Fruit2Driver.class); // 3.设置Mapper和输出KV类型 TableMapReduceUtil.initTableMapperJob("fruit1", new Scan(), Fruit2Mapper.class, ImmutableBytesWritable.class, Put.class, job); // 4.设置Reducer&输出的表 TableMapReduceUtil.initTableReducerJob("fruit2", Fruit2Reducer.class, job); // 5.提交任务 boolean result = job.waitForCompletion(true); return result ? 0 : 1; } public void setConf(Configuration conf) { configuration = conf; } public Configuration getConf() { return configuration; } public static void main(String[] args) { try { Configuration configuration = HBaseConfiguration.create(); ToolRunner.run(configuration, new Fruit2Driver(), args); } catch (Exception e) { e.printStackTrace(); } } }
4.打包运行任务
/opt/module/hadoop-3.1.3/bin/yarn jar HBaseExample-1.0-SNAPSHOT.jar com.stitch.hbase.mr2.FruitDriver
提示:运行任务前,如果待数据导入的表不存在,则需要提前创建。
提示:maven打包命令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)
提示:运行任务前,如果待数据导入的表不存在,则需要提前创建。