zoukankan      html  css  js  c++  java
  • Flink连接器-批处理-读写Hbase

    Flink批处理与hbase的读写

    source-hbase

    父类

    是模仿官方写的.

    import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
    import org.apache.flink.api.common.io.RichInputFormat;
    import org.apache.flink.api.common.io.statistics.BaseStatistics;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.core.io.InputSplitAssigner;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.hbase.util.Pair;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @Auther WeiJiQian
     * @描述
     */
    public abstract class SourceHBaseInputBase<T> extends RichInputFormat<T, MyTableInputSplit>{
        protected static final Logger LOG = LoggerFactory.getLogger(SourceHBaseInputBase.class);
    
        // helper variable to decide whether the input is exhausted or not
        protected boolean endReached = false;
    
        protected transient HTable table = null;
        protected transient Scan scan = null;
        protected transient Connection connection = null;
    
        /** HBase iterator wrapper. */
        protected ResultScanner resultScanner = null;
    
        protected byte[] currentRow;
        protected long scannedRows;
    
    
        protected ParameterTool parameterTool;
        protected abstract T mapResultToOutType(Result r);
        protected abstract void getScan();
        protected abstract TableName getTableName();
    
        protected void getTable() throws IOException {
            org.apache.hadoop.conf.Configuration configuration;
            parameterTool = PropertiesUtil.PARAMETER_TOOL;
            configuration = HBaseConfiguration.create();
            configuration.set(HBASE_ZOOKEEPER_QUORUM, parameterTool.get(HBASE_ZOOKEEPER_QUORUM));
            configuration.set(HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT, parameterTool.get(HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT));
            configuration.set(HBASE_RPC_TIMEOUT, parameterTool.get(HBASE_RPC_TIMEOUT));
            configuration.set(HBASE_CLIENT_OPERATION_TIMEOUT, parameterTool.get(HBASE_CLIENT_OPERATION_TIMEOUT));
            configuration.set(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, parameterTool.get(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
            connection = ConnectionFactory.createConnection(configuration);
            table = (HTable) connection.getTable(getTableName());
    
        }
    
        @SneakyThrows
        @Override
        public void configure(Configuration parameters) {
            getTable();
            getScan();
        }
    
        @Override
        public void open(MyTableInputSplit split) throws IOException {
            System.out.println("open:" + table == null);
            if (table == null) {
                System.out.println("open:table is null ---------");
                throw new IOException("The HBase table has not been opened! " +
                        "This needs to be done in configure().");
            }
            if (scan == null) {
                throw new IOException("Scan has not been initialized! " +
                        "This needs to be done in configure().");
            }
            if (split == null) {
                throw new IOException("Input split is null!");
            }
    
            logSplitInfo("opening", split);
    
            // set scan range
            currentRow = split.getStartRow();
            scan.setStartRow(currentRow);
            scan.setStopRow(split.getEndRow());
    
            resultScanner = table.getScanner(scan);
            endReached = false;
            scannedRows = 0;
        }
    
        public T nextRecord(T reuse) throws IOException {
            if (resultScanner == null) {
                throw new IOException("No table result scanner provided!");
            }
            Result res;
            try {
                res = resultScanner.next();
            } catch (Exception e) {
                resultScanner.close();
                //workaround for timeout on scan
                LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);
                scan.withStartRow(currentRow, false);
                resultScanner = table.getScanner(scan);
                res = resultScanner.next();
            }
    
            if (res != null) {
                scannedRows++;
                currentRow = res.getRow();
                return mapResultToOutType(res);
            }
    
            endReached = true;
            return null;
        }
    
        private void logSplitInfo(String action, MyTableInputSplit split) {
            int splitId = split.getSplitNumber();
            String splitStart = Bytes.toString(split.getStartRow());
            String splitEnd = Bytes.toString(split.getEndRow());
            String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
            String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
            String[] hostnames = split.getHostnames();
            LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, splitId, hostnames, splitStartKey, splitStopKey);
        }
    
        @Override
        public boolean reachedEnd() throws IOException {
            return endReached;
        }
    
        @Override
        public void close() throws IOException {
            LOG.info("Closing split (scanned {} rows)", scannedRows);
            currentRow = null;
            try {
                if (resultScanner != null) {
                    resultScanner.close();
                }
            } finally {
                resultScanner = null;
            }
        }
    
        @Override
        public void closeInputFormat() throws IOException {
            try {
                if (connection != null) {
                    connection.close();
                }
            } finally {
                connection = null;
            }
    
            try {
                if (table != null) {
                    table.close();
                }
            } finally {
                table = null;
            }
        }
    
        @Override
        public MyTableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
            if (table == null) {
                throw new IOException("The HBase table has not been opened! " +
                        "This needs to be done in configure().");
            }
            if (scan == null) {
                throw new IOException("Scan has not been initialized! " +
                        "This needs to be done in configure().");
            }
    
            // Get the starting and ending row keys for every region in the currently open table
            final Pair<byte[][], byte[][]> keys = table.getRegionLocator().getStartEndKeys();
            if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
                throw new IOException("Expecting at least one region.");
            }
            final byte[] startRow = scan.getStartRow();
            final byte[] stopRow = scan.getStopRow();
            final boolean scanWithNoLowerBound = startRow.length == 0;
            final boolean scanWithNoUpperBound = stopRow.length == 0;
    
            final List<MyTableInputSplit> splits = new ArrayList<MyTableInputSplit>(minNumSplits);
            for (int i = 0; i < keys.getFirst().length; i++) {
                final byte[] startKey = keys.getFirst()[i];
                final byte[] endKey = keys.getSecond()[i];
                final String regionLocation = table.getRegionLocator().getRegionLocation(startKey, false).getHostnamePort();
                // Test if the given region is to be included in the InputSplit while splitting the regions of a table
                if (!includeRegionInScan(startKey, endKey)) {
                    continue;
                }
                // Find the region on which the given row is being served
                final String[] hosts = new String[]{regionLocation};
    
                // Determine if regions contains keys used by the scan
                boolean isLastRegion = endKey.length == 0;
                if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) &&
                        (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
    
                    final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;
                    final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0)
                            && !isLastRegion ? endKey : stopRow;
                    int id = splits.size();
                    final MyTableInputSplit split = new MyTableInputSplit(id, hosts, table.getName().getName(), splitStart, splitStop);
                    splits.add(split);
                }
            }
            LOG.info("Created " + splits.size() + " splits");
            for (MyTableInputSplit split : splits) {
                logSplitInfo("created", split);
            }
            return splits.toArray(new MyTableInputSplit[splits.size()]);
        }
    
        /**
         * Test if the given region is to be included in the scan while splitting the regions of a table.
         *
         * @param startKey Start key of the region
         * @param endKey   End key of the region
         * @return true, if this region needs to be included as part of the input (default).
         */
        protected boolean includeRegionInScan(final byte[] startKey, final byte[] endKey) {
            return true;
        }
    
        @Override
        public InputSplitAssigner getInputSplitAssigner(MyTableInputSplit[] inputSplits) {
            return new LocatableInputSplitAssigner(inputSplits);
        }
    
        @Override
        public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
            return null;
        }
    
    }
    
    
    

    子类

    import org.apache.flink.configuration.Configuration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
    import org.apache.hadoop.hbase.filter.CompareFilter;
    import org.apache.hadoop.hbase.filter.FilterList;
    import org.apache.hadoop.hbase.filter.RowFilter;
    import org.apache.hadoop.hbase.util.Bytes;
    
    import javax.swing.*;
    import java.util.List;
    
    import static org.apache.hadoop.hbase.filter.FilterList.Operator.MUST_PASS_ONE;
    
    /**
     * @author WeiJiQian
     * @param
     * @return
     */
    public class SourceDaysHbase extends SourceHBaseInputBase<UsersBean> {
    
        public SourceDaysHbase(List<String> dates){
            this.dates = dates;
        }
    
        private List<String> dates;
        private UsersBean usersBean = new UsersBean();
    
        @Override
        public void configure(Configuration parameters) {
            super.configure(parameters);
        }
    
        @Override
        protected UsersBean mapResultToOutType(Result r) {
             usersBean.setPhone11(CustomizeUtils.getPhoneOfPersonaDataRowKey(Bytes.toString(r.getRow())));
             usersBean.setPhone8(CustomizeUtils.getPhone8(usersBean.getPhone11()));
             return usersBean;
        }
    
        @Override
        protected void getScan() {
            scan = new Scan();
            scan.addColumn(HBaseConstant.HBASE_PERSONA_FAMILY_MONTH_DAY, HBaseConstant.HBASE_PERSONA_ACTIVITE_DATE);
        }
    
        @Override
        protected TableName getTableName() {
            return TableName.valueOf(parameterTool.get(HBaseConstant.HBASE_TABLE_NAME_PERSONA_DATA));
        }
    }
    
    

    sink-hbase

    import lombok.extern.slf4j.Slf4j;
    import org.apache.flink.api.common.io.OutputFormat;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.configuration.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.mortbay.util.MultiPartWriter;
    
    import java.io.IOException;
    
    import static com.hecaiyun.common.bean.HBaseConstant.*;
    
    /**
     * @Auther WeiJiQian
     * @描述
     */
    @Slf4j
    public abstract class HBaseOutputFormatBase<T> implements OutputFormat<T> {
    
        protected final String valueString = "1";
        protected String date ;
        protected Table table ;
        protected Connection connection;
        protected BufferedMutatorParams params;
        protected BufferedMutator mutator;
        protected org.apache.hadoop.conf.Configuration configuration;
        protected ParameterTool parameterTool;
    
        
        public abstract TableName getTableName();
        
    
    
        public void configure(Configuration parameters) {
            parameterTool = PropertiesUtil.PARAMETER_TOOL;
            configuration = HBaseConfiguration.create();
            configuration.set(HBASE_ZOOKEEPER_QUORUM, parameterTool.get(HBASE_ZOOKEEPER_QUORUM));
            configuration.set(HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT, parameterTool.get(HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT));
            configuration.set(HBASE_RPC_TIMEOUT, parameterTool.get(HBASE_RPC_TIMEOUT));
            configuration.set(HBASE_CLIENT_OPERATION_TIMEOUT, parameterTool.get(HBASE_CLIENT_OPERATION_TIMEOUT));
            configuration.set(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, parameterTool.get(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
        }
    
        public void open(int taskNumber, int numTasks) throws IOException {
            connection =  ConnectionFactory.createConnection(configuration);
            table = connection.getTable(getTableName());
            params = new BufferedMutatorParams(table.getName());
            //设置缓存的大小 100M
            params.writeBufferSize(parameterTool.getLong(HBASE_WRITEBUFFER_SIZE));
            mutator = connection.getBufferedMutator(params);
            
        }
    
      
        /*
         * @author WeiJiQian
         * @param rowKey
         * @param family
         * @param colum
         * @param value
         * @return org.apache.hadoop.hbase.client.Put
         * 描述  覆盖数据
         */
        public void putData(String rowKey,byte[] family, byte[] colum,String value ) throws IOException {
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(family,colum,Bytes.toBytes(value));
            put.setDurability(Durability.SKIP_WAL);
            mutator.mutate(put);
        }
    
        public void close() throws IOException {
            if (mutator != null){
                mutator.flush();
                mutator.close();
            }
            if (table != null){
                table.close();
            }
           if (connection != null){
               connection.close();
           }
    
        }
    }
    
    
  • 相关阅读:
    bzoj 2159 Crash 的文明世界
    bzoj 4241 历史研究
    数论大合集(柿子版)
    [IOI2005]mou
    CSP 2020 T3 函数调用
    线段树 --算法竞赛专题解析(24)
    树状数组 --算法竞赛专题解析(23)
    算法竞赛专题解析(22):数论--同余
    算法竞赛专题解析(21):数论--线性丢番图方程
    算法竞赛专题解析(20):数论--GCD和LCM
  • 原文地址:https://www.cnblogs.com/weijiqian/p/14034860.html
Copyright © 2011-2022 走看看