zoukankan      html  css  js  c++  java
  • Flink学习笔记——读写Hbase

    1.如果是csa(Cloudera Streaming Analytics)版本的Hbase,可以参考Cloudera官方例子,通过引入flink-hbase来实现

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-hbase_2.11</artifactId>
        <version>1.9.0-csa1.0.0.0</version>
    </dependency>
    

    要求flink最低版本1.9.0,hbase最低版本2.1.0-cdh6.3.0,然后就可以使用HBaseSinkFunction来写Hbase

    https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-hbase-configuration.html
    

    2.如果是低版本的Hbase,则需要自行使用Java API来写HBase,比如我使用hbase版本为1.2.0-cdh5.1.6.2,可以参考:HBase读写的几种方式(三)flink篇Flink 消费kafka数据写入hbase

    写HBase的时候注意开启Hbase regionserver的hbase.regionserver.port端口防火墙,默认为60020

    Java API写Hbase的方式主要有2种:

    1.一种是使用Table的put API,可以参考

    https://github.com/phillip2019/flink-parent/blob/master/flink-connectors/flink-connector-hbase/src/main/java/com/aikosolar/bigdata/flink/connectors/hbase/SimpleHBaseTableSink.java
    

    使用Flink来写Hbase的时候,需要继承RichSinkFunction,然后重写invoke方法,在invoke方法中调用HBase的put API

    需要注意的是调用HBase Table的put API较为低效,即使是使用了List<Put>,也会消耗较多的资源,建议使用第二种的BufferedMutator

    import org.apache.commons.io.IOUtils;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    import org.apache.flink.util.Preconditions;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    
    public class SimpleHBaseTableSink extends RichSinkFunction<ConsumerRecord<String, String>> {
    
        private final String tableName;
        private final HBaseWriterConfig writerConfig;
    
        private transient TableName tName;
        private transient Connection connection;
    
        public SimpleHBaseTableSink(HBaseWriterConfig writerConfig, String tableName) throws Exception {
            Preconditions.checkNotNull(writerConfig);
            Preconditions.checkNotNull(tableName);
    
            this.writerConfig = writerConfig;
            this.tableName = tableName;
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            this.tName = TableName.valueOf(tableName);
    
            org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
            this.writerConfig.getHbaseConfig().forEach(conf::set);
            this.connection = ConnectionFactory.createConnection(conf);
        }
    
        @Override
        public void invoke(ConsumerRecord<String, String> record, Context context) throws Exception {
            Table table = null;
            try {
                table = connection.getTable(this.tName);
                String rowKey = String.valueOf(record.offset());
                String value = record.value();
                Put put = new Put(Bytes.toBytes(rowKey));
                put.addColumn(Bytes.toBytes("test_family"), Bytes.toBytes("test_col"), Bytes.toBytes(value));
    
                if (StringUtils.isNotBlank(writerConfig.getDurability())) {
                    try {
                        Durability d = Durability.valueOf(writerConfig.getDurability());
                        put.setDurability(d);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                table.put(put);
            } finally {
                IOUtils.closeQuietly(table);
            }
        }
    
        @Override
        public void close() throws Exception {
            if (this.connection != null) {
                this.connection.close();
                this.connection = null;
            }
        }
    }
    

    HBaseWriterConfig

    import lombok.AccessLevel;
    import lombok.AllArgsConstructor;
    import lombok.Getter;
    import lombok.NoArgsConstructor;
    import org.apache.commons.collections.MapUtils;
    
    import java.io.Serializable;
    import java.util.HashMap;
    import java.util.Map;
    
    @Getter
    @AllArgsConstructor(access = AccessLevel.PRIVATE)
    @NoArgsConstructor(access = AccessLevel.PRIVATE)
    public class HBaseWriterConfig implements Serializable {
        private int writeBufferSize = -1;
        private int asyncFlushSize = -1;
        private long asyncFlushInterval = -1;
        private boolean async;
        private String durability;
        private Map<String, String> hbaseConfig = new HashMap<>();
    
        public static class Builder {
            private int writeBufferSize = 5 * 1024 * 1024;
            private int asyncFlushSize = 5000;
            // 单位:秒
            private long asyncFlushInterval = 60;
            private boolean async;
            private String durability;
            private Map<String, String> config = new HashMap<>();
    
            public static synchronized Builder me() {
                return new Builder();
            }
    
            public Builder setDurability(String durability) {
                this.durability = durability;
                return this;
            }
    
            public Builder writeBufferSize(int writeBufferSize) {
                this.writeBufferSize = writeBufferSize;
                return this;
            }
    
            public Builder conf(String key, String value) {
                this.config.put(key, value);
                return this;
            }
    
            public Builder conf(Map<String, String> config) {
                if (MapUtils.isNotEmpty(config)) {
                    for (Map.Entry<String, String> e : config.entrySet()) {
                        this.config.put(e.getKey(), e.getValue());
                    }
                }
                return this;
            }
    
            public Builder aync() {
                this.async = true;
                return this;
            }
    
            public Builder flushInterval(long interval) {
                this.asyncFlushInterval = interval;
                return this;
            }
    
            public Builder flushSize(int size) {
                this.asyncFlushSize = size;
                return this;
            }
    
            public Builder async(boolean enable) {
                this.async = enable;
                return this;
            }
    
            public HBaseWriterConfig build() {
                return new HBaseWriterConfig(writeBufferSize, asyncFlushSize, asyncFlushInterval, async, durability, config);
            }
    
        }
    }
    

     addSink

            dataStream.addSink(new SimpleHBaseTableSink(HBaseWriterConfig.Builder.me()
                    .async(true)
                    .build(),
                    "test_table")
            );
    

    2.另一种是使用BufferedMutator,可以参考

    https://github.com/phillip2019/flink-parent/blob/master/flink-connectors/flink-connector-hbase/src/main/java/com/aikosolar/bigdata/flink/connectors/hbase/writter/HBaseWriterWithBuffer.java
    

    使用BufferedMutator进行异步批量插入的方式效率更高

    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    import org.apache.flink.util.Preconditions;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    
    public class SimpleHBaseTableSink extends RichSinkFunction<ConsumerRecord<String, String>> {
    
        private final String tableName;
        private final HBaseWriterConfig writerConfig;
    
        private transient TableName tName;
        private transient Connection connection;
    
        public SimpleHBaseTableSink(HBaseWriterConfig writerConfig, String tableName) throws Exception {
            Preconditions.checkNotNull(writerConfig);
            Preconditions.checkNotNull(tableName);
    
            this.writerConfig = writerConfig;
            this.tableName = tableName;
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            this.tName = TableName.valueOf(tableName);
    
            org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
            this.writerConfig.getHbaseConfig().forEach(conf::set);
            this.connection = ConnectionFactory.createConnection(conf);
        }
    
        @Override
        public void invoke(ConsumerRecord<String, String> record, Context context) throws Exception {
            String rowKey = String.valueOf(record.offset());
            String value = record.value();
    
            // 设置缓存1m,当达到1m时数据会自动刷到hbase,替代了hTable.setWriteBufferSize(30 * 1024 * 1024)
            BufferedMutatorParams params = new BufferedMutatorParams(this.tName);
            params.writeBufferSize(1024 * 1024);
            // 创建一个批量异步与hbase通信的对象
            BufferedMutator mutator = connection.getBufferedMutator(params);
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(Bytes.toBytes("test_family"), Bytes.toBytes("test_col"), Bytes.toBytes(value));
            // 向hbase插入数据,达到缓存会自动提交,这里也可以通过传入List<put>的方式批量插入
            mutator.mutate(put);
            // 不用每次put后就调用flush,最后调用就行,这个方法替代了旧api的hTable.setAutoFlush(false, false)
            mutator.flush();
            mutator.close();
        }
    
        @Override
        public void close() throws Exception {
            if (this.connection != null) {
                this.connection.close();
                this.connection = null;
            }
        }
    }
    

    一些Hbase sink需要的constants

    https://github.com/apache/hbase/blob/master/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
    

    需要引入

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-common</artifactId>
        <version>1.2.0-cdh5.15.1</version>
    </dependency>
    

    参考flume的sink实现

    flume在1.8.0支持hbase的1.x版本,使用

    https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
    

    在1.9.0支持了habse的2.x版本,使用

    https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2SinkConfigurationConstants.java
    
  • 相关阅读:
    centos7下安装docker
    java中获取两个时间中的每一天
    Linq中string转int的方法
    logstash 主题综合篇
    Windows环境下ELK(5.X)平台的搭建
    本地没问题 服务器 提示 Server Error in '/' Application
    错误 未能找到类型或命名空间名称"xxxxxx"的真正原因
    System.web和System.WebServer
    Chrome Adobe Flash Player 因过期而 阻止
    请求WebApi的几种方式
  • 原文地址:https://www.cnblogs.com/tonglin0325/p/14183852.html
Copyright © 2011-2022 走看看