zoukankan      html  css  js  c++  java
  • spring boot 集成 Hadoop3.2.0 + HBase2.3.0+ Spark3.0.0

    引入包:

            <!-- Spring HBase 依赖 -->
            <!--==================hadoop ===================-->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>javax.servlet</groupId>
                        <artifactId>servlet-api</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>${hadoop.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>javax.servlet</groupId>
                        <artifactId>servlet-api</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>${hadoop.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>javax.servlet</groupId>
                        <artifactId>servlet-api</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <!-- mapreduce 核心jar包  -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-core</artifactId>
                <version>${hadoop.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
    
            <!--==================HBase ===================-->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>${hbase.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>${hbase.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-common</artifactId>
                <version>${hbase.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-mapreduce</artifactId>
                <version>${hbase.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-annotations -->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-annotations</artifactId>
                <version>${hbase.version}</version>
            </dependency>
    
            <!--spark-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>${spark.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.logging.log4j</groupId>
                        <artifactId>log4j-api</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>ch.qos.logback</groupId>
                        <artifactId>logback-classic</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.12</artifactId>
                <version>${spark.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.logging.log4j</groupId>
                        <artifactId>log4j-api</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.logging.log4j</groupId>
                        <artifactId>log4j-to-slf4j</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>ch.qos.logback</groupId>
                        <artifactId>logback-classic</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.12</artifactId>
                <version>${spark.version}</version>
            </dependency>

    新建一个 HBaseBean 类

    public class HBaseBean {
    
        public HBaseBean() {
        }
    
    
        /**
         * hbase中的rowKey
         */
        private String rowKey;
    
        /**
         * hbase中的列族
         */
        private String columnFamily;
    
    
        /**
         * hbase 列字段名
         */
        private String columnQualifier;
    
        /**
         * 时间戳
         */
        private Long timeStamp;
    
        /**
         * 类型
         */
        private String type;
    
    
        /**
         * 值
         */
        private String value;
    
        public String getRowKey() {
            return rowKey;
        }
    
        public void setRowKey(String rowKey) {
            this.rowKey = rowKey;
        }
    
        public String getColumnFamily() {
            return columnFamily;
        }
    
        public void setColumnFamily(String columnFamily) {
            this.columnFamily = columnFamily;
        }
    
        public String getColumnQualifier() {
            return columnQualifier;
        }
    
        public void setColumnQualifier(String columnQualifier) {
            this.columnQualifier = columnQualifier;
        }
    
        public Long getTimeStamp() {
            return timeStamp;
        }
    
        public void setTimeStamp(Long timeStamp) {
            this.timeStamp = timeStamp;
        }
    
        public String getType() {
            return type;
        }
    
        public void setType(String type) {
            this.type = type;
        }
    
        public String getValue() {
            return value;
        }
    
        public void setValue(String value) {
            this.value = value;
        }
    }

    新建一个  HBaseConfig 类

    /**
     * HBase配置类
     * HBaseConfiguration.create() 会去CLASSPATH,下找hbase-site.xml
     * @author child
     * @date 2020-7-14 12:11:18
     * https://hbase.apache.org/book.html#faq 官网的
     * http://c.biancheng.net/view/6523.html hbase用法参考
     * */
    @Configuration
    public class HBaseConfig {
    
        @Bean
        public HbaseTemplate hbaseTemplate() {
            Connection connection = null;
            try {
                org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
                connection = ConnectionFactory.createConnection(conf);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return  new HbaseTemplate(connection);
        }
    
        @Bean
        public Admin admin() {
            Admin admin = null;
            try {
                org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
                Connection connection = ConnectionFactory.createConnection(conf);
                admin = connection.getAdmin();
            } catch (IOException e) {
                e.printStackTrace();
            }
            return admin;
        }
    }

    新建一个  HbaseTemplate 封装一些常用的方法

    import com.culturalCenter.dataCenter.Utils.HBaseUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.NamespaceDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.util.Assert;
    import org.springframework.util.CollectionUtils;
    import scala.Serializable;
    
    import javax.annotation.PostConstruct;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    
    /**
     * 对hbase 的 DDL、DML操作或者使用HBaseUtils
     *
     * @Author wulincheng
     * @Date 2020-7-13 15:14:08
     * @Version 1.0
     */
    public class HbaseTemplate{
    
        private Logger log = LoggerFactory.getLogger(this.getClass());
        /**
         * hbase连接对象
         */
        private Connection connection;
    
        public HbaseTemplate() {
        }
    
        public HbaseTemplate(Connection connection) {
            setConnection(connection);
        }
        public Connection getConnection() {
            return connection;
        }
    
        private Admin getAdmin() throws IOException {
            return connection.getAdmin();
        }
    
        public void setConnection(Connection connection) {
            this.connection = connection;
        }
    
        /**
         * 获取 {@link Table}
         *
         * @param tableName 表名称
         * @return
         * @throws IOException
         */
        public Table getTable(String tableName) throws IOException {
    
            return connection.getTable(TableName.valueOf(tableName));
        }
    
        /**
         * 创建命名空间
         *
         * @param nameSpace 命名空间名称
         */
        public void createNameSpace(String nameSpace) {
            Assert.hasLength(nameSpace, "命名空间不能为空");
            NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(nameSpace).build();
            try (Admin admin = getAdmin()) {
                admin.createNamespace(namespaceDescriptor);
            } catch (IOException e) {
                log.error("创建命名空间 [{}] 失败", nameSpace, e);
            }
        }
    
        /**
         * 删除命名空间
         *
         * @param nameSpace 命名空间名称
         */
        public void deleteNameSpace(String nameSpace) {
            Assert.hasLength(nameSpace, "命名空间不能为空");
            try (Admin admin = getAdmin()) {
                admin.deleteNamespace(nameSpace);
            } catch (IOException e) {
                log.error("删除命名空间 [{}] 失败", nameSpace, e);
            }
        }
    
    
        /**
         * 创建表
         *
         * @param tableName 表名称
         * @param CF        表中的列族
         * @return
         */
        public void createTable(String tableName, String... CF) {
            Assert.hasLength(tableName, "表名不能为空");
            Assert.notEmpty(CF, "列族不能为空");
            TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
    
            List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
            for (String columnFamily : CF) {
                ColumnFamilyDescriptor build = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily)).build();
                columnFamilyDescriptors.add(build);
            }
            TableDescriptor tableDescriptor = tableDescriptorBuilder.setColumnFamilies(columnFamilyDescriptors).build();
            try (Admin admin = getAdmin()) {
                admin.createTable(tableDescriptor);
            } catch (IOException e) {
                log.error("创建 table => {} 失败", tableName, e);
            }
        }
    
        /**
         * 禁用表
         *
         * @param tableName 表名称
         * @return
         */
        public void disableTable(String tableName) {
            Assert.hasLength(tableName, "表名不能为空");
            try (Admin admin = getAdmin()) {
                admin.disableTable(TableName.valueOf(tableName));
            } catch (IOException e) {
                log.error("禁用 table => {} 失败", tableName, e);
            }
        }
    
        /**
         * 删除表
         *
         * @param tableName 表名称
         * @return
         */
        public void deleteTable(String tableName) {
            Assert.hasLength(tableName, "表名不能为空");
            try (Admin admin = getAdmin()) {
                //禁用表之后才能删除表
                disableTable(tableName);
                //删除表
                admin.deleteTable(TableName.valueOf(tableName));
            } catch (IOException e) {
                log.error("删除 table => {} 失败", tableName, e);
            }
        }
    
    
        /**
         * 列出指定命名空间下的所有表
         *
         * @param nameSpace 命名空间名称
         * @return {@link TableName}
         */
        public List<TableName> listTable(String nameSpace) {
            Assert.hasLength(nameSpace, "命名空间不能为空");
            try (Admin admin = getAdmin()) {
                TableName[] tableNames = admin.listTableNamesByNamespace(nameSpace);
                List<TableName> tableNameList = (List<TableName>) CollectionUtils.arrayToList(tableNames);
                return tableNameList;
            } catch (IOException e) {
                log.error("获取命名空间 [{}] 下的所有表失败", nameSpace, e);
            }
            return null;
        }
    
        /**
         * 列出 default 命名空间下的所有表
         *
         * @return {@link TableName}
         */
        public List<TableName> listTableByDefault() {
            return listTable("default");
        }
    
        /**
         * 扫描表
         *
         * @param tableName 表名
         * @return
         */
        public List<Map<String, String>> scanTable(String tableName) {
            Assert.hasLength(tableName, "表名不能为空");
            Scan scan = new Scan();
            return getResult(tableName, scan);
        }
    
        /**
         * 插入数据
         * @param tableName 表名
         * @param rowKey rowKey
         * @param columnFamily 列族
         * @param columns 列
         * @param values 值
         * @return true/false
         */
        public boolean putData(String tableName, String rowKey, String columnFamily, List<String> columns, List<String> values) {
            try {
                Table table = getAdmin().getConnection().getTable(TableName.valueOf(tableName));
                Put put = new Put(Bytes.toBytes(rowKey));
                for (int i=0; i<columns.size(); i++) {
                    put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columns.get(i)), Bytes.toBytes(values.get(i)));
                }
                table.put(put);
                return true;
            } catch (IOException e) {
                e.printStackTrace();
                return false;
            }
        }
    
        /**
         * 向表中添加数据
         *
         * @param tableName
         * @param puts
         */
        public void putRowData(String tableName, Put... puts) {
            Assert.hasLength(tableName, "表名不能为空");
            Assert.notEmpty(puts, "添加数据不能为空");
            try (Table table = getTable(tableName)) {
                List<Put> putList = (List<Put>) CollectionUtils.arrayToList(puts);
                table.put(putList);
            } catch (IOException e) {
                log.error("添加数据失败");
            }
        }
    
        /**
         * 向表中添加数据
         *
         * @param tableName
         * @param puts
         */
        public void putRowData(String tableName, List<Put> puts) {
            Assert.hasLength(tableName, "表名不能为空");
            Assert.notEmpty(puts, "添加数据不能为空");
            try (Table table = getTable(tableName)) {
                table.put(puts);
            } catch (IOException e) {
                log.error("添加数据失败");
            }
        }
    
        /**
         * 删除数据
         *
         * @param tableName
         * @param rowKeys
         */
        public void deleteRowData(String tableName, String... rowKeys) {
            Assert.hasLength(tableName, "表名不能为空");
            Assert.notEmpty(rowKeys, "rowKey 列表不能为空");
            try (Table table = getTable(tableName)) {
                List<Delete> deleteList = new ArrayList<>(rowKeys.length);
                for (String rowKey : rowKeys) {
                    Delete delete = new Delete(Bytes.toBytes(rowKey));
                    deleteList.add(delete);
                }
                table.delete(deleteList);
            } catch (IOException e) {
                log.error("删除数据失败, rowKey => [{}]", rowKeys, e);
            }
        }
    
    
        /**
         * 获取单行数据
         *
         * @param tableName 表名
         * @param rowKey    rowKey
         * @return
         */
        public Map<String, String> getRowData(String tableName, String rowKey) {
            Assert.hasLength(tableName, "表名不能为空");
            Assert.hasLength(rowKey, "rowKey 不能为空");
            Get get = new Get(Bytes.toBytes(rowKey));
            Map<String, String> rowData = null;
            try (Table table = getTable(tableName);) {
                Result result = table.get(get);
                rowData = HBaseUtils.getRowData(result);
            } catch (IOException e) {
                log.error("查询 rowKey 为 [{}] 的数据时出错", rowKey, e);
            }
            return rowData;
        }
    
    
        /**
         * 获取单行数据
         *
         * @param tableName 表名
         * @param rowKey    rowKey
         * @return
         */
        public HBaseBean getRowData_1(String tableName, String rowKey) {
            Assert.hasLength(tableName, "表名不能为空");
            Assert.hasLength(rowKey, "rowKey 不能为空");
            HBaseBean hBaseBean = null;
            Get get = new Get(Bytes.toBytes(rowKey));
            Map<String, String> rowData = null;
            try (Table table = getTable(tableName);) {
                Result result = table.get(get);
                hBaseBean = HBaseUtils.toHBaseBean(result);
            } catch (IOException e) {
                log.error("查询 rowKey 为 [{}] 的数据时出错", rowKey, e);
            }
            return hBaseBean;
        }
    
    
        /**
         * 获取某个Rowkey 的一个列族所有的数据
         *
         * @param tableName    表名称
         * @param rowKey       rowkey
         * @param columnFamily 列族
         * @return
         */
        public HBaseBean getRowAndFamily(String tableName, String rowKey, String columnFamily) {
            Assert.hasLength(tableName, "表名不能为空");
            Assert.hasLength(rowKey, "rowKey 不能为空");
            HBaseBean hBaseBean = null;
            Get get = new Get(Bytes.toBytes(rowKey));
            get.addFamily(Bytes.toBytes(columnFamily));
            Map<String, String> rowData = null;
            try (Table table = getTable(tableName);) {
                Result result = table.get(get);
                hBaseBean = HBaseUtils.toHBaseBean(result);
            } catch (IOException e) {
                log.error("查询 rowKey 为 [{}] 的数据时出错", rowKey, e);
            }
            return hBaseBean;
        }
    
        /**
         * rowKey 范围查找
         *
         * @param tableName
         * @param startRowKey
         * @param stopRowKey
         * @return
         */
        public List<HBaseBean> scanStartAndStopRow(String tableName, String startRowKey, String stopRowKey) {
            Assert.hasLength(tableName, "表名不能为空");
            Assert.hasLength(startRowKey, "startRowKey 不能为空");
            Assert.hasLength(stopRowKey, "startRowKey 不能为空");
            List<HBaseBean> hBaseBeans = null;
            try (Table table = getTable(tableName)) {
                Scan scan = new Scan();
                scan.withStartRow(Bytes.toBytes(startRowKey));
                scan.withStopRow(Bytes.toBytes(stopRowKey));
                ResultScanner scanner = table.getScanner(scan);
                hBaseBeans = HBaseUtils.toHBaseBeans(scanner.iterator());
            } catch (IOException e) {
                log.error("查询表数据出错", e);
            }
            return hBaseBeans;
        }
    
    
        /**
         * 获取所有数据
         */
        private List<Map<String, String>> getResult(String tableName, Scan scan) {
            List<Map<String, String>> result = null;
            try (Table table = getTable(tableName)) {
                ResultScanner scanner = table.getScanner(scan);
                for (Result rs : scanner) {
                    Map<String, String> column = HBaseUtils.getRowData(rs);
                    result.add(column);
                }
            } catch (IOException e) {
                log.error("查询表数据出错", e);
            }
            return result;
        }
    
    
    }

    新建一个HBase(HBaseUtils 也可以直接整合在HBase)帮助类

    import com.culturalCenter.dataCenter.globalconfig.hbasee.HBaseBean;
    import org.apache.hadoop.hbase.*;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import org.springframework.util.StringUtils;
    
    import java.io.IOException;
    import java.util.*;
    
    @Service
    public class HBaseUtils {
    //    @Autowired
    //    private HbaseTemplate hbaseTemplate;
    
        @Autowired
        private Admin hbaseAdmin;
    
        /**
         * 判断表是否存在
         *
         * @param tableName 表名
         * @return true/false
         */
        public boolean isExists(String tableName) {
            boolean tableExists = false;
            try {
                TableName table = TableName.valueOf(tableName);
                tableExists = hbaseAdmin.tableExists(table);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return tableExists;
        }
    
        /**
         * 分区【10, 20, 30】 -> ( ,10] (10,20] (20,30] (30, )
         * @param keys 分区集合[10, 20, 30]
         * @return byte二维数组
         */
        private byte[][] getSplitKeys(List<String> keys) {
            byte[][] splitKeys = new byte[keys.size()][];
            TreeSet<byte[]> rows = new TreeSet<>(Bytes.BYTES_COMPARATOR);
            for(String key : keys) {
                rows.add(Bytes.toBytes(key));
            }
            int i = 0;
            for (byte[] row : rows) {
                splitKeys[i] = row;
                i ++;
            }
            return splitKeys;
        }
    
        public static HBaseBean toHBaseBean(Result rs){
            List<Cell> cells = rs.listCells();
            HBaseBean  hBaseBean=new HBaseBean();
            cells.forEach(cell -> {
                hBaseBean.setRowKey(Bytes.toString(CellUtil.cloneRow(cell)));
                hBaseBean.setColumnFamily(Bytes.toString(CellUtil.cloneFamily(cell)));
                hBaseBean.setColumnQualifier(Bytes.toString(CellUtil.cloneQualifier(cell)));
                hBaseBean.setTimeStamp(cell.getTimestamp());
                hBaseBean.setType(cell.getType().toString());
                hBaseBean.setValue(Bytes.toString(CellUtil.cloneValue(cell)));
            });
            return hBaseBean;
        }
    
        public static List<HBaseBean> toHBaseBeans(Result rs){
            List<HBaseBean>  hBaseBeans=new ArrayList<>();
            List<Cell> cells = rs.listCells();
            cells.forEach(cell -> {
                HBaseBean  hBaseBean=new HBaseBean();
                hBaseBean.setRowKey(Bytes.toString(CellUtil.cloneRow(cell)));
                hBaseBean.setColumnFamily(Bytes.toString(CellUtil.cloneFamily(cell)));
                hBaseBean.setColumnQualifier(Bytes.toString(CellUtil.cloneQualifier(cell)));
                hBaseBean.setTimeStamp(cell.getTimestamp());
                hBaseBean.setType(cell.getType().toString());
                hBaseBean.setValue(Bytes.toString(CellUtil.cloneValue(cell)));
                hBaseBeans.add(hBaseBean);
            });
            return hBaseBeans;
        }
    
        public static List<HBaseBean> toHBaseBeans(Iterator<Result> resultIterator){
            List<HBaseBean>  hBaseBeans=new ArrayList<>();
            while (resultIterator.hasNext()){
                Result rs = resultIterator.next();
                List<Cell> cells = rs.listCells();
                cells.forEach(cell -> {
                    HBaseBean  hBaseBean=new HBaseBean();
                    hBaseBean.setRowKey(Bytes.toString(CellUtil.cloneRow(cell)));
                    hBaseBean.setColumnFamily(Bytes.toString(CellUtil.cloneFamily(cell)));
                    hBaseBean.setColumnQualifier(Bytes.toString(CellUtil.cloneQualifier(cell)));
                    hBaseBean.setTimeStamp(cell.getTimestamp());
                    hBaseBean.setType(cell.getType().toString());
                    hBaseBean.setValue(Bytes.toString(CellUtil.cloneValue(cell)));
                    hBaseBeans.add(hBaseBean);
                });
            }
            return hBaseBeans;
        }
    
        /**
         * 获取单行结果
         * @return
         */
        public static  Map<String, String> getRowData(Result rs) {
            Map<String, String> column = new HashMap<>();
            column.put("rowKey", Bytes.toString(rs.getRow()));
            List<Cell> cells = rs.listCells();
            for (Cell cell : cells) {
                String columnName = Bytes.toString(CellUtil.cloneQualifier(cell));
                String columnValue =  Bytes.toString(CellUtil.cloneValue(cell));
                column.put(columnName, columnValue);
            }
            return column;
        }
    
        public static void setStartAndStop(String startRow, String stopRow, Scan scan) {
            if (!StringUtils.isEmpty(startRow)) {
                scan.withStartRow(Bytes.toBytes(startRow));
            }
    
            if (!StringUtils.isEmpty(stopRow)) {
                scan.withStopRow(Bytes.toBytes(stopRow));
            }
        }
    }

    新建一个类配置spark 

    SparkContextBean
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import scala.Serializable;
    
    @Configuration
    @ConfigurationProperties(prefix="spark")
    public class SparkContextBean  implements Serializable {
    
        //spark的安装地址
        private String sparkHome = "";
        //应用的名称
        private String appName = "";
        //master的地址
        private String master = "";
    
        @Bean
        @ConditionalOnMissingBean(SparkConf.class)
        public SparkConf sparkConf() throws Exception {
            SparkConf conf = new SparkConf()
                    .setSparkHome(sparkHome)
                    .setAppName(appName)
                    .setMaster(master);
            conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
            return conf;
        }
    
        @Bean
        @ConditionalOnMissingBean(JavaSparkContext.class)
        public JavaSparkContext javaSparkContext() throws Exception {
            return new JavaSparkContext(sparkConf());
        }
    
        public String getSparkHome() {
            return sparkHome;
        }
    
        public void setSparkHome(String sparkHome) {
            this.sparkHome = sparkHome;
        }
    
        public String getAppName() {
            return appName;
        }
    
        public void setAppName(String appName) {
            this.appName = appName;
        }
    
        public String getMaster() {
            return master;
        }
    
        public void setMaster(String master) {
            this.master = master;
        }
    }

    然后把服务上配置好的hbase-site.xml文件复制到 resources 根目录下就可以了

  • 相关阅读:
    华为全联接大会2019,共创智能新高度
    CTDC2019首席技术官领袖峰会,AI赋能 智享5G
    2019全球体验设计峰会:体验赋能商业,创造更好体验
    全球闪存峰会旨在深化技术创新,增进闪存产业链上下游
    PyCon 2019火热来袭,与大数据、人工智能等专家一起探讨Python语言
    PHPConChina 2019 PHP开发者大会将于8月在上海举办!
    2019腾讯Live开发者大会(TLC),引领技术新趋势
    2019 HTML5深度应用开发实践
    2019年5G物联网关键技术与应用培训,了解5G网络发展现状及进展
    2019第二届企业云服务大会 -- 企业智变,云化未来
  • 原文地址:https://www.cnblogs.com/Mr-lin66/p/13386681.html
Copyright © 2011-2022 走看看