1.如果是csa(Cloudera Streaming Analytics)版本的Hbase,可以参考Cloudera官方例子,通过引入flink-hbase来实现
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hbase_2.11</artifactId> <version>1.9.0-csa1.0.0.0</version> </dependency>
要求flink最低版本1.9.0,hbase最低版本2.1.0-cdh6.3.0,然后就可以使用HBaseSinkFunction来写Hbase
https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-hbase-configuration.html
2.如果是低版本的Hbase,则需要自行使用Java API来写HBase,比如我使用hbase版本为1.2.0-cdh5.1.6.2,可以参考:HBase读写的几种方式(三)flink篇 和 Flink 消费kafka数据写入hbase
写HBase的时候注意开启Hbase regionserver的hbase.regionserver.port端口防火墙,默认为60020
Java API写Hbase的方式主要有2种:
1.一种是使用Table的put API,可以参考
https://github.com/phillip2019/flink-parent/blob/master/flink-connectors/flink-connector-hbase/src/main/java/com/aikosolar/bigdata/flink/connectors/hbase/SimpleHBaseTableSink.java
使用Flink来写Hbase的时候,需要继承RichSinkFunction,然后重写invoke方法,在invoke方法中调用HBase的put API
需要注意的是调用HBase Table的put API较为低效,即使是使用了List<Put>,也会消耗较多的资源,建议使用第二种的BufferedMutator
import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.util.Preconditions; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.consumer.ConsumerRecord; public class SimpleHBaseTableSink extends RichSinkFunction<ConsumerRecord<String, String>> { private final String tableName; private final HBaseWriterConfig writerConfig; private transient TableName tName; private transient Connection connection; public SimpleHBaseTableSink(HBaseWriterConfig writerConfig, String tableName) throws Exception { Preconditions.checkNotNull(writerConfig); Preconditions.checkNotNull(tableName); this.writerConfig = writerConfig; this.tableName = tableName; } @Override public void open(Configuration parameters) throws Exception { this.tName = TableName.valueOf(tableName); org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); this.writerConfig.getHbaseConfig().forEach(conf::set); this.connection = ConnectionFactory.createConnection(conf); } @Override public void invoke(ConsumerRecord<String, String> record, Context context) throws Exception { Table table = null; try { table = connection.getTable(this.tName); String rowKey = String.valueOf(record.offset()); String value = record.value(); Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes("test_family"), Bytes.toBytes("test_col"), Bytes.toBytes(value)); if (StringUtils.isNotBlank(writerConfig.getDurability())) { try { Durability d = Durability.valueOf(writerConfig.getDurability()); put.setDurability(d); } catch (Exception e) { e.printStackTrace(); } } table.put(put); } finally { IOUtils.closeQuietly(table); } } @Override public void close() throws Exception { if (this.connection != null) { this.connection.close(); this.connection = null; } } }
HBaseWriterConfig
import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import org.apache.commons.collections.MapUtils; import java.io.Serializable; import java.util.HashMap; import java.util.Map; @Getter @AllArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PRIVATE) public class HBaseWriterConfig implements Serializable { private int writeBufferSize = -1; private int asyncFlushSize = -1; private long asyncFlushInterval = -1; private boolean async; private String durability; private Map<String, String> hbaseConfig = new HashMap<>(); public static class Builder { private int writeBufferSize = 5 * 1024 * 1024; private int asyncFlushSize = 5000; // 单位:秒 private long asyncFlushInterval = 60; private boolean async; private String durability; private Map<String, String> config = new HashMap<>(); public static synchronized Builder me() { return new Builder(); } public Builder setDurability(String durability) { this.durability = durability; return this; } public Builder writeBufferSize(int writeBufferSize) { this.writeBufferSize = writeBufferSize; return this; } public Builder conf(String key, String value) { this.config.put(key, value); return this; } public Builder conf(Map<String, String> config) { if (MapUtils.isNotEmpty(config)) { for (Map.Entry<String, String> e : config.entrySet()) { this.config.put(e.getKey(), e.getValue()); } } return this; } public Builder aync() { this.async = true; return this; } public Builder flushInterval(long interval) { this.asyncFlushInterval = interval; return this; } public Builder flushSize(int size) { this.asyncFlushSize = size; return this; } public Builder async(boolean enable) { this.async = enable; return this; } public HBaseWriterConfig build() { return new HBaseWriterConfig(writeBufferSize, asyncFlushSize, asyncFlushInterval, async, durability, config); } } }
addSink
dataStream.addSink(new SimpleHBaseTableSink(HBaseWriterConfig.Builder.me() .async(true) .build(), "test_table") );
2.另一种是使用BufferedMutator,可以参考
https://github.com/phillip2019/flink-parent/blob/master/flink-connectors/flink-connector-hbase/src/main/java/com/aikosolar/bigdata/flink/connectors/hbase/writter/HBaseWriterWithBuffer.java
使用BufferedMutator进行异步批量插入的方式效率更高
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.util.Preconditions; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.consumer.ConsumerRecord; public class SimpleHBaseTableSink extends RichSinkFunction<ConsumerRecord<String, String>> { private final String tableName; private final HBaseWriterConfig writerConfig; private transient TableName tName; private transient Connection connection; public SimpleHBaseTableSink(HBaseWriterConfig writerConfig, String tableName) throws Exception { Preconditions.checkNotNull(writerConfig); Preconditions.checkNotNull(tableName); this.writerConfig = writerConfig; this.tableName = tableName; } @Override public void open(Configuration parameters) throws Exception { this.tName = TableName.valueOf(tableName); org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); this.writerConfig.getHbaseConfig().forEach(conf::set); this.connection = ConnectionFactory.createConnection(conf); } @Override public void invoke(ConsumerRecord<String, String> record, Context context) throws Exception { String rowKey = String.valueOf(record.offset()); String value = record.value(); // 设置缓存1m,当达到1m时数据会自动刷到hbase,替代了hTable.setWriteBufferSize(30 * 1024 * 1024) BufferedMutatorParams params = new BufferedMutatorParams(this.tName); params.writeBufferSize(1024 * 1024); // 创建一个批量异步与hbase通信的对象 BufferedMutator mutator = connection.getBufferedMutator(params); Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes("test_family"), Bytes.toBytes("test_col"), Bytes.toBytes(value)); // 向hbase插入数据,达到缓存会自动提交,这里也可以通过传入List<put>的方式批量插入 mutator.mutate(put); // 不用每次put后就调用flush,最后调用就行,这个方法替代了旧api的hTable.setAutoFlush(false, false) mutator.flush(); mutator.close(); } @Override public void close() throws Exception { if (this.connection != null) { this.connection.close(); this.connection = null; } } }
一些Hbase sink需要的constants
https://github.com/apache/hbase/blob/master/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
需要引入
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.2.0-cdh5.15.1</version> </dependency>
参考flume的sink实现
flume在1.8.0支持hbase的1.x版本,使用
https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
在1.9.0支持了habse的2.x版本,使用
https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2SinkConfigurationConstants.java