zoukankan      html  css  js  c++  java
  • HBase读写的几种方式(三)flink篇

    1. HBase连接的方式概况

    主要分为:

    1. 纯Java API读写HBase的方式;
    2. Spark读写HBase的方式;
    3. Flink读写HBase的方式;
    4. HBase通过Phoenix读写的方式;

    第一种方式是HBase自身提供的比较原始的高效操作方式,而第二、第三则分别是Spark、Flink集成HBase的方式,最后一种是第三方插件Phoenix集成的JDBC方式,Phoenix集成的JDBC操作方式也能在Spark、Flink中调用。

    注意:

    这里我们使用HBase2.1.2版本,flink1.7.2版本,scala-2.12版本。

    2. Flink Streaming和Flink DataSet读写HBase

     Flink上读取HBase数据有两种方式:

    • 继承RichSourceFunction重写父类方法(flink streaming)
    • 实现自定义TableInputFormat接口(flink streaming和flink dataSet)

    Flink上将数据写入HBase也有两种方式:

    • 继承RichSinkFunction重写父类方法(flink streaming)
    • 实现OutputFormat接口(flink streaming和flink dataSet)

    注意:

    ① Flink Streaming流式处理有上述两种方式;但是Flink DataSet批处理,读只有“实现TableInputFormat接口”一种方式,写只有”实现OutputFormat接口“一种方式

    ②TableInputFormat接口是在flink-hbase-2.12-1.7.2里面的,而该jar包对应的hbase版本是1.4.3,而项目中我们使用HBase2.1.2版本,故需要对TableInputFormat重写。

       

    2.1 Flink读取HBase的两种方式

    注意:读取HBase之前可以先执行节点2.2.2实现OutputFormat接口:Flink dataSet 批处理写入HBase的方法,确保HBase test表里面有数据,数据如下:

      

    2.1.1 继承RichSourceFunction重写父类方法:

    package cn.swordfall.hbaseOnFlink
    
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
    import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName}
    import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Scan, Table}
    import org.apache.hadoop.hbase.util.Bytes
    import scala.collection.JavaConverters._
    /**
      * @Author: Yang JianQiu
      * @Date: 2019/2/28 18:05
      *
      * 以HBase为数据源
      * 从HBase中获取数据,然后以流的形式发射
      *
      * 从HBase读取数据
      * 第一种:继承RichSourceFunction重写父类方法
      */
    class HBaseReader extends RichSourceFunction[(String, String)]{
    
      private var conn: Connection = null
      private var table: Table = null
      private var scan: Scan = null
    
      /**
        * 在open方法使用HBase的客户端连接
        * @param parameters
        */
      override def open(parameters: Configuration): Unit = {
        val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create()
    
        config.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")
        config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
        config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)
        config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)
    
        val tableName: TableName = TableName.valueOf("test")
        val cf1: String = "cf1"
        conn = ConnectionFactory.createConnection(config)
        table = conn.getTable(tableName)
        scan = new Scan()
        scan.withStartRow(Bytes.toBytes("100"))
        scan.withStopRow(Bytes.toBytes("107"))
        scan.addFamily(Bytes.toBytes(cf1))
      }
    
      /**
        * run方法来自java的接口文件SourceFunction,使用IDEA工具Ctrl + o 无法便捷获取到该方法,直接override会提示
        * @param sourceContext
        */
      override def run(sourceContext: SourceContext[(String, String)]): Unit = {
        val rs = table.getScanner(scan)
        val iterator = rs.iterator()
        while (iterator.hasNext){
          val result = iterator.next()
          val rowKey = Bytes.toString(result.getRow)
          val sb: StringBuffer = new StringBuffer()
          for (cell:Cell <- result.listCells().asScala){
            val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
            sb.append(value).append("_")
          }
          val valueString = sb.replace(sb.length() - 1, sb.length(), "").toString
          sourceContext.collect((rowKey, valueString))
        }
      }
    
      /**
        * 必须添加
        */
      override def cancel(): Unit = {
    
      }
    
      /**
        * 关闭hbase的连接,关闭table表
        */
      override def close(): Unit = {
        try {
          if (table != null) {
            table.close()
          }
          if (conn != null) {
            conn.close()
          }
        } catch {
          case e:Exception => println(e.getMessage)
        }
      }
    }

    调用继承RichSourceFunction的HBaseReader类,Flink Streaming流式处理的方式:

    /**
      * 从HBase读取数据
      * 第一种:继承RichSourceFunction重写父类方法
      */
     def readFromHBaseWithRichSourceFunction(): Unit ={
       val env = StreamExecutionEnvironment.getExecutionEnvironment
       env.enableCheckpointing(5000)
       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
       env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
       val dataStream: DataStream[(String, String)] = env.addSource(new HBaseReader)
       dataStream.map(x => println(x._1 + " " + x._2))
       env.execute()
     }

    2.1.2 实现自定义的TableInputFormat接口:

    由于版本不匹配,这里我们需要对flink-hbase-2.12-1.7.2里面的三个文件进行重写,分别是TableInputSplit、AbstractTableInputFormat、TableInputFormat

    TableInputSplit重写为CustomTableInputSplit:

    package cn.swordfall.hbaseOnFlink.flink172_hbase212;
    
    import org.apache.flink.core.io.LocatableInputSplit;
    
    /**
     * @Author: Yang JianQiu
     * @Date: 2019/3/19 11:50
     */
    public class CustomTableInputSplit extends LocatableInputSplit {
        private static final long serialVersionUID = 1L;
    
        /** The name of the table to retrieve data from. */
        private final byte[] tableName;
    
        /** The start row of the split. */
        private final byte[] startRow;
    
        /** The end row of the split. */
        private final byte[] endRow;
    
        /**
         * Creates a new table input split.
         *
         * @param splitNumber
         *        the number of the input split
         * @param hostnames
         *        the names of the hosts storing the data the input split refers to
         * @param tableName
         *        the name of the table to retrieve data from
         * @param startRow
         *        the start row of the split
         * @param endRow
         *        the end row of the split
         */
        CustomTableInputSplit(final int splitNumber, final String[] hostnames, final byte[] tableName, final byte[] startRow,
                        final byte[] endRow) {
            super(splitNumber, hostnames);
    
            this.tableName = tableName;
            this.startRow = startRow;
            this.endRow = endRow;
        }
    
        /**
         * Returns the table name.
         *
         * @return The table name.
         */
        public byte[] getTableName() {
            return this.tableName;
        }
    
        /**
         * Returns the start row.
         *
         * @return The start row.
         */
        public byte[] getStartRow() {
            return this.startRow;
        }
    
        /**
         * Returns the end row.
         *
         * @return The end row.
         */
        public byte[] getEndRow() {
            return this.endRow;
        }
    }

    AbstractTableInputFormat重写为CustomeAbstractTableInputFormat:

    package cn.swordfall.hbaseOnFlink.flink172_hbase212;
    
    import org.apache.flink.addons.hbase.AbstractTableInputFormat;
    import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
    import org.apache.flink.api.common.io.RichInputFormat;
    import org.apache.flink.api.common.io.statistics.BaseStatistics;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.core.io.InputSplitAssigner;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.hbase.util.Pair;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @Author: Yang JianQiu
     * @Date: 2019/3/19 11:16
     *
     * 由于flink-hbase_2.12_1.7.2 jar包所引用的是hbase1.4.3版本,而现在用到的是hbase2.1.2,版本不匹配
     * 故需要重写flink-hbase_2.12_1.7.2里面的AbstractTableInputFormat,主要原因是AbstractTableInputFormat里面调用的是hbase1.4.3版本的api,
     * 而新版本hbase2.1.2已经去掉某些api
     */
    public abstract class CustomAbstractTableInputFormat<T> extends RichInputFormat<T, CustomTableInputSplit> {
        protected static final Logger LOG = LoggerFactory.getLogger(AbstractTableInputFormat.class);
    
        // helper variable to decide whether the input is exhausted or not
        protected boolean endReached = false;
    
        protected transient HTable table = null;
        protected transient Scan scan = null;
    
        /** HBase iterator wrapper. */
        protected ResultScanner resultScanner = null;
    
        protected byte[] currentRow;
        protected long scannedRows;
    
        /**
         * Returns an instance of Scan that retrieves the required subset of records from the HBase table.
         *
         * @return The appropriate instance of Scan for this use case.
         */
        protected abstract Scan getScanner();
    
        /**
         * What table is to be read.
         *
         * <p>Per instance of a TableInputFormat derivative only a single table name is possible.
         *
         * @return The name of the table
         */
        protected abstract String getTableName();
    
        /**
         * HBase returns an instance of {@link Result}.
         *
         * <p>This method maps the returned {@link Result} instance into the output type {@link T}.
         *
         * @param r The Result instance from HBase that needs to be converted
         * @return The appropriate instance of {@link T} that contains the data of Result.
         */
        protected abstract T mapResultToOutType(Result r);
    
        /**
         * Creates a {@link Scan} object and opens the {@link HTable} connection.
         *
         * <p>These are opened here because they are needed in the createInputSplits
         * which is called before the openInputFormat method.
         *
         * <p>The connection is opened in this method and closed in {@link #closeInputFormat()}.
         *
         * @param parameters The configuration that is to be used
         * @see Configuration
         */
        @Override
        public abstract void configure(Configuration parameters);
    
        @Override
        public void open(CustomTableInputSplit split) throws IOException {
            if (table == null) {
                throw new IOException("The HBase table has not been opened! " +
                        "This needs to be done in configure().");
            }
            if (scan == null) {
                throw new IOException("Scan has not been initialized! " +
                        "This needs to be done in configure().");
            }
            if (split == null) {
                throw new IOException("Input split is null!");
            }
    
            logSplitInfo("opening", split);
    
            // set scan range
            currentRow = split.getStartRow();
           /* scan.setStartRow(currentRow);
            scan.setStopRow(split.getEndRow());*/
            scan.withStartRow(currentRow);
            scan.withStopRow(split.getEndRow());
    
            resultScanner = table.getScanner(scan);
            endReached = false;
            scannedRows = 0;
        }
    
        @Override
        public T nextRecord(T reuse) throws IOException {
            if (resultScanner == null) {
                throw new IOException("No table result scanner provided!");
            }
            try {
                Result res = resultScanner.next();
                if (res != null) {
                    scannedRows++;
                    currentRow = res.getRow();
                    return mapResultToOutType(res);
                }
            } catch (Exception e) {
                resultScanner.close();
                //workaround for timeout on scan
                LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);
                /*scan.setStartRow(currentRow);*/
                scan.withStartRow(currentRow);
                resultScanner = table.getScanner(scan);
                Result res = resultScanner.next();
                if (res != null) {
                    scannedRows++;
                    currentRow = res.getRow();
                    return mapResultToOutType(res);
                }
            }
    
            endReached = true;
            return null;
        }
    
        private void logSplitInfo(String action, CustomTableInputSplit split) {
            int splitId = split.getSplitNumber();
            String splitStart = Bytes.toString(split.getStartRow());
            String splitEnd = Bytes.toString(split.getEndRow());
            String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
            String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
            String[] hostnames = split.getHostnames();
            LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, splitId, hostnames, splitStartKey, splitStopKey);
        }
    
        @Override
        public boolean reachedEnd() throws IOException {
            return endReached;
        }
    
        @Override
        public void close() throws IOException {
            LOG.info("Closing split (scanned {} rows)", scannedRows);
            currentRow = null;
            try {
                if (resultScanner != null) {
                    resultScanner.close();
                }
            } finally {
                resultScanner = null;
            }
        }
    
        @Override
        public void closeInputFormat() throws IOException {
            try {
                if (table != null) {
                    table.close();
                }
            } finally {
                table = null;
            }
        }
    
        @Override
        public CustomTableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
            if (table == null) {
                throw new IOException("The HBase table has not been opened! " +
                        "This needs to be done in configure().");
            }
            if (scan == null) {
                throw new IOException("Scan has not been initialized! " +
                        "This needs to be done in configure().");
            }
    
            // Get the starting and ending row keys for every region in the currently open table
            final Pair<byte[][], byte[][]> keys = table.getRegionLocator().getStartEndKeys();
            if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
                throw new IOException("Expecting at least one region.");
            }
            final byte[] startRow = scan.getStartRow();
            final byte[] stopRow = scan.getStopRow();
            final boolean scanWithNoLowerBound = startRow.length == 0;
            final boolean scanWithNoUpperBound = stopRow.length == 0;
    
            final List<CustomTableInputSplit> splits = new ArrayList<CustomTableInputSplit>(minNumSplits);
            for (int i = 0; i < keys.getFirst().length; i++) {
                final byte[] startKey = keys.getFirst()[i];
                final byte[] endKey = keys.getSecond()[i];
                final String regionLocation = table.getRegionLocator().getRegionLocation(startKey, false).getHostnamePort();
                // Test if the given region is to be included in the InputSplit while splitting the regions of a table
                if (!includeRegionInScan(startKey, endKey)) {
                    continue;
                }
                // Find the region on which the given row is being served
                final String[] hosts = new String[]{regionLocation};
    
                // Determine if regions contains keys used by the scan
                boolean isLastRegion = endKey.length == 0;
                if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) &&
                        (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
    
                    final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;
                    final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0)
                            && !isLastRegion ? endKey : stopRow;
                    int id = splits.size();
                    final CustomTableInputSplit split = new CustomTableInputSplit(id, hosts, table.getName().getName(), splitStart, splitStop);
                    splits.add(split);
                }
            }
            LOG.info("Created " + splits.size() + " splits");
            for (CustomTableInputSplit split : splits) {
                logSplitInfo("created", split);
            }
            return splits.toArray(new CustomTableInputSplit[splits.size()]);
        }
    
        /**
         * Test if the given region is to be included in the scan while splitting the regions of a table.
         *
         * @param startKey Start key of the region
         * @param endKey   End key of the region
         * @return true, if this region needs to be included as part of the input (default).
         */
        protected boolean includeRegionInScan(final byte[] startKey, final byte[] endKey) {
            return true;
        }
    
        @Override
        public InputSplitAssigner getInputSplitAssigner(CustomTableInputSplit[] inputSplits) {
            return new LocatableInputSplitAssigner(inputSplits);
        }
    
        @Override
        public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
            return null;
        }
    }

    TableInputFormat重写为CustomTableInputFormat:

    package cn.swordfall.hbaseOnFlink.flink172_hbase212;
    
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.configuration.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    
    /**
     * @Author: Yang JianQiu
     * @Date: 2019/3/19 11:15
     * 由于flink-hbase_2.12_1.7.2 jar包所引用的是hbase1.4.3版本,而现在用到的是hbase2.1.2,版本不匹配
     * 故需要重写flink-hbase_2.12_1.7.2里面的TableInputFormat
     */
    public abstract class CustomTableInputFormat<T extends Tuple> extends CustomAbstractTableInputFormat<T> {
    
        private static final long serialVersionUID = 1L;
    
        /**
         * Returns an instance of Scan that retrieves the required subset of records from the HBase table.
         * @return The appropriate instance of Scan for this usecase.
         */
        @Override
        protected abstract Scan getScanner();
    
        /**
         * What table is to be read.
         * Per instance of a TableInputFormat derivative only a single tablename is possible.
         * @return The name of the table
         */
        @Override
        protected abstract String getTableName();
    
        /**
         * The output from HBase is always an instance of {@link Result}.
         * This method is to copy the data in the Result instance into the required {@link Tuple}
         * @param r The Result instance from HBase that needs to be converted
         * @return The appropriate instance of {@link Tuple} that contains the needed information.
         */
        protected abstract T mapResultToTuple(Result r);
    
        /**
         * Creates a {@link Scan} object and opens the {@link HTable} connection.
         * These are opened here because they are needed in the createInputSplits
         * which is called before the openInputFormat method.
         * So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()}.
         *
         * @param parameters The configuration that is to be used
         * @see Configuration
         */
        @Override
        public void configure(Configuration parameters) {
            table = createTable();
            if (table != null) {
                scan = getScanner();
            }
        }
    
        /**
         * Create an {@link HTable} instance and set it into this format.
         */
        private HTable createTable() {
            LOG.info("Initializing HBaseConfiguration");
            //use files found in the classpath
            org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create();
    
            try {
                return null;
            } catch (Exception e) {
                LOG.error("Error instantiating a new HTable instance", e);
            }
            return null;
        }
    
        @Override
        protected T mapResultToOutType(Result r) {
            return mapResultToTuple(r);
        }
    }

    继承自定义的CustomTableInputFormat,进行hbase连接、读取操作:

    package cn.swordfall.hbaseOnFlink
    
    import java.io.IOException
    
    import cn.swordfall.hbaseOnFlink.flink172_hbase212.CustomTableInputFormat
    import org.apache.flink.api.java.tuple.Tuple2
    import org.apache.flink.addons.hbase.TableInputFormat
    import org.apache.flink.configuration.Configuration
    import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName}
    import org.apache.hadoop.hbase.client._
    import org.apache.hadoop.hbase.util.Bytes
    
    import scala.collection.JavaConverters._
    /**
      * @Author: Yang JianQiu
      * @Date: 2019/3/1 1:14
      *
      * 从HBase读取数据
      * 第二种:实现TableInputFormat接口
      */
    class HBaseInputFormat extends CustomTableInputFormat[Tuple2[String, String]]{
    
      // 结果Tuple
      val tuple2 = new Tuple2[String, String]
    
      /**
        * 建立HBase连接
        * @param parameters
        */
      override def configure(parameters: Configuration): Unit = {
        val tableName: TableName = TableName.valueOf("test")
        val cf1 = "cf1"
        var conn: Connection = null
        val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create
    
        config.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")
        config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
        config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)
        config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)
    
        try {
          conn = ConnectionFactory.createConnection(config)
          table = conn.getTable(tableName).asInstanceOf[HTable]
          scan = new Scan()
          scan.withStartRow(Bytes.toBytes("001"))
          scan.withStopRow(Bytes.toBytes("201"))
          scan.addFamily(Bytes.toBytes(cf1))
        } catch {
          case e: IOException =>
            e.printStackTrace()
        }
      }
    
      /**
        * 对获取的数据进行加工处理
        * @param result
        * @return
        */
      override def mapResultToTuple(result: Result): Tuple2[String, String] = {
        val rowKey = Bytes.toString(result.getRow)
        val sb = new StringBuffer()
        for (cell: Cell <- result.listCells().asScala){
          val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
          sb.append(value).append("_")
        }
        val value = sb.replace(sb.length() - 1, sb.length(), "").toString
        tuple2.setField(rowKey, 0)
        tuple2.setField(value, 1)
        tuple2
      }
    
      /**
        * tableName
        * @return
        */
      override def getTableName: String = "test"
    
    
      /**
        * 获取Scan
        * @return
        */
      override def getScanner: Scan = {
        scan
      }
    
    }

    调用实现CustomTableInputFormat接口的类HBaseInputFormat,Flink Streaming流式处理的方式:

    /**
     * 从HBase读取数据
     * 第二种:实现TableInputFormat接口
     */
     def readFromHBaseWithTableInputFormat(): Unit ={
       val env = StreamExecutionEnvironment.getExecutionEnvironment
       env.enableCheckpointing(5000)
       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
       env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    
       val dataStream = env.createInput(new HBaseInputFormat)
       dataStream.filter(_.f0.startsWith("10")).print()
       env.execute()
     }

    而Flink DataSet批处理的方式为:

    /**
     * 读取HBase数据方式:实现TableInputFormat接口
     */
     def readFromHBaseWithTableInputFormat(): Unit ={
       val env = ExecutionEnvironment.getExecutionEnvironment
    
       val dataStream = env.createInput(new HBaseInputFormat)
       dataStream.filter(_.f1.startsWith("20")).print()
     }

    2.2 Flink写入HBase的两种方式

    这里Flink Streaming写入HBase,需要从Kafka接收数据,可以开启kafka单机版,利用kafka-console-producer.sh往topic "test"写入如下数据:

    100,hello,20
    101,nice,24
    102,beautiful,26

    2.2.1 继承RichSinkFunction重写父类方法:

    package cn.swordfall.hbaseOnFlink
    
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
    import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
    import org.apache.hadoop.hbase.client._
    import org.apache.hadoop.hbase.util.Bytes
    
    /**
      * @Author: Yang JianQiu
      * @Date: 2019/3/1 1:34
      *
      * 写入HBase
      * 第一种:继承RichSinkFunction重写父类方法
      *
      * 注意:由于flink是一条一条的处理数据,所以我们在插入hbase的时候不能来一条flush下,
      * 不然会给hbase造成很大的压力,而且会产生很多线程导致集群崩溃,所以线上任务必须控制flush的频率。
      *
      * 解决方案:我们可以在open方法中定义一个变量,然后在写入hbase时比如500条flush一次,或者加入一个list,判断list的大小满足某个阀值flush一下
      */
    class HBaseWriter extends RichSinkFunction[String]{
    
      var conn: Connection = null
      val scan: Scan = null
      var mutator: BufferedMutator = null
      var count = 0
    
      /**
        * 建立HBase连接
        * @param parameters
        */
      override def open(parameters: Configuration): Unit = {
        val config:org.apache.hadoop.conf.Configuration = HBaseConfiguration.create
        config.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")
        config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
        config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)
        config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)
        conn = ConnectionFactory.createConnection(config)
    
        val tableName: TableName = TableName.valueOf("test")
        val params: BufferedMutatorParams = new BufferedMutatorParams(tableName)
        //设置缓存1m,当达到1m时数据会自动刷到hbase
        params.writeBufferSize(1024 * 1024) //设置缓存的大小
        mutator = conn.getBufferedMutator(params)
        count = 0
      }
    
      /**
        * 处理获取的hbase数据
        * @param value
        * @param context
        */
      override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
        val cf1 = "cf1"
        val array: Array[String] = value.split(",")
        val put: Put = new Put(Bytes.toBytes(array(0)))
        put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1)))
        put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2)))
        mutator.mutate(put)
        //每满2000条刷新一下数据
        if (count >= 2000){
          mutator.flush()
          count = 0
        }
        count = count + 1
      }
    
      /**
        * 关闭
        */
      override def close(): Unit = {
        if (conn != null) conn.close()
      }
    }

    调用继承RichSinkFunction的HBaseWriter类,Flink Streaming流式处理的方式:

    /**
      * 写入HBase
      * 第一种:继承RichSinkFunction重写父类方法
      */
     def write2HBaseWithRichSinkFunction(): Unit = {
       val topic = "test"
       val props = new Properties
       props.put("bootstrap.servers", "192.168.187.201:9092")
       props.put("group.id", "kv_flink")
       props.put("enable.auto.commit", "true")
       props.put("auto.commit.interval.ms", "1000")
       props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
       props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
       val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
       env.enableCheckpointing(5000)
       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
       env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
       val myConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema, props)
       val dataStream: DataStream[String] = env.addSource(myConsumer)
       //写入HBase
       dataStream.addSink(new HBaseWriter)
       env.execute()
     }

    2.2.2 实现OutputFormat接口:

    package cn.swordfall.hbaseOnFlink
    
    import org.apache.flink.api.common.io.OutputFormat
    import org.apache.flink.configuration.Configuration
    import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
    import org.apache.hadoop.hbase.client._
    import org.apache.hadoop.hbase.util.Bytes
    
    /**
      * @Author: Yang JianQiu
      * @Date: 2019/3/1 1:40
      *
      * 写入HBase提供两种方式
      * 第二种:实现OutputFormat接口
      */
    class HBaseOutputFormat extends OutputFormat[String]{
    
      val zkServer = "192.168.187.201"
      val port = "2181"
      var conn: Connection = null
      var mutator: BufferedMutator = null
      var count = 0
    
      /**
        * 配置输出格式。此方法总是在实例化输出格式上首先调用的
        *
        * @param configuration
        */
      override def configure(configuration: Configuration): Unit = {
    
      }
    
      /**
        * 用于打开输出格式的并行实例,所以在open方法中我们会进行hbase的连接,配置,建表等操作。
        *
        * @param i
        * @param i1
        */
      override def open(i: Int, i1: Int): Unit = {
        val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create
        config.set(HConstants.ZOOKEEPER_QUORUM, zkServer)
        config.set(HConstants.ZOOKEEPER_CLIENT_PORT, port)
        config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)
        config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)
        conn = ConnectionFactory.createConnection(config)
    
        val tableName: TableName = TableName.valueOf("test")
    
        val params: BufferedMutatorParams = new BufferedMutatorParams(tableName)
        //设置缓存1m,当达到1m时数据会自动刷到hbase
        params.writeBufferSize(1024 * 1024) //设置缓存的大小
        mutator = conn.getBufferedMutator(params)
        count = 0
      }
    
      /**
        * 用于将数据写入数据源,所以我们会在这个方法中调用写入hbase的API
        *
        * @param it
        */
      override def writeRecord(it: String): Unit = {
    
        val cf1 = "cf1"
        val array: Array[String] = it.split(",")
        val put: Put = new Put(Bytes.toBytes(array(0)))
        put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1)))
        put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2)))
        mutator.mutate(put)
        //每4条刷新一下数据,如果是批处理调用outputFormat,这里填写的4必须不能大于批处理的记录总数量,否则数据不会更新到hbase里面
        if (count >= 4){
          mutator.flush()
          count = 0
        }
        count = count + 1
      }
    
      /**
        * 关闭
        */
      override def close(): Unit = {
        try {
          if (conn != null) conn.close()
        } catch {
          case e: Exception => println(e.getMessage)
        }
      }
    }

    调用实现OutputFormat的HBaseOutputFormat类,Flink Streaming流式处理的方式:

    /**
      * 写入HBase
      * 第二种:实现OutputFormat接口
      */
     def write2HBaseWithOutputFormat(): Unit = {
       val topic = "test"
       val props = new Properties
       props.put("bootstrap.servers", "192.168.187.201:9092")
       props.put("group.id", "kv_flink")
       props.put("enable.auto.commit", "true")
       props.put("auto.commit.interval.ms", "1000")
       props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
       props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
       val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
       env.enableCheckpointing(5000)
       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
       env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
       val myConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema, props)
       val dataStream: DataStream[String] = env.addSource(myConsumer)
       dataStream.writeUsingOutputFormat(new HBaseOutputFormat)
       env.execute()
     }

    而Flink DataSet批处理的方式为:

    /**
      * 写入HBase方式:实现OutputFormat接口
      */
     def write2HBaseWithOutputFormat(): Unit = {
       val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    
       //2.定义数据
       val dataSet: DataSet[String] = env.fromElements("103,zhangsan,20", "104,lisi,21", "105,wangwu,22", "106,zhaolilu,23")
       dataSet.output(new HBaseOutputFormat)
       //运行下面这句话,程序才会真正执行,这句代码针对的是data sinks写入数据的
       env.execute()
     }

    注意:

      如果是批处理调用的,应该要注意HBaseOutputFormat类的writeRecord方法每次批量刷新的数据量不能大于批处理的总记录数据量,否则数据更新不到hbase里面。

    3. 总结

    【其他相关文章】

    HBase连接的几种方式(一)java篇  查看纯Java API读写HBase

    HBase连接的几种方式(二)spark篇 查看Spark上读写HBase

    github地址:

    https://github.com/SwordfallYeung/HBaseDemo(flink读写hbase包括java和scala两个版本的代码)

    【参考资料】

    https://blog.csdn.net/liguohuabigdata/article/details/78588861

     https://blog.csdn.net/aA518189/article/details/86544844

  • 相关阅读:
    Xlua侧如何接受CSharp侧的可变参数
    C# 中如何精确地测量一段逻辑的执行时间
    C#中设计一个 ListPool 的方案
    unity中获取设备的信息
    Oracle-游标-取钱-存钱-转账
    Oracle 存储过程与java调用
    PL/SQL loop,while.for循环
    PL/SQL if case when
    PL/SQL %type %rowtype
    Oracle PLSQL入门
  • 原文地址:https://www.cnblogs.com/swordfall/p/10527423.html
Copyright © 2011-2022 走看看