zoukankan      html  css  js  c++  java
  • 使用Spark的newAPIHadoopRDD接口访问有kerberos认证的hbase

    使用newAPIHadoopRDD接口访问hbase数据,网上有很多可以参考的例子,但是由于环境使用了kerberos安全加固,spark使用有kerberos认证的hbase,网上的参考资料不多,访问hbase时,有些需要注意的地方,这里简单记录下最后的实现方案以及实现过程中遇到的坑,博客有kerberos认证hbase在spark环境下的使用提供了很大的帮助!!!

    环境及版本信息

    CDH6.2.1大数据集群(包含yarn、spark、hdfs等组件)

    项目pom文件

    首先说明的是不需要安装scala,本地local模式运行时,在pom中直接添加scala运行时依赖即可;另外最终应用是放到集群中运行的,CDH Spark中的lib中都存在scala、spark-core、spark-sql等相关依赖,所以在pom文件中都作为provided属性添加,即编译时使用。

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.css.bigdata</groupId>
        <artifactId>data-compare</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <java.version>1.8</java.version>
            <version.hbase>2.1.0-cdh6.2.1</version.hbase>
            <version.hadoop>3.0.0-cdh6.2.1</version.hadoop>
            <maven.compiler.source>1.8</maven.compiler.source>
            <version.scala>2.11</version.scala>
            <version.scala.libray>2.11.12</version.scala.libray>
            <version.spark>2.4.0-cdh6.2.1</version.spark>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${version.scala.libray}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_${version.scala}</artifactId>
                <version>${version.spark}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_${version.scala}</artifactId>
                <version>${version.spark}</version>
                <scope>provided</scope>
            </dependency>
    
            <!--HBase -->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>${version.hbase}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>${version.hbase}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-mapreduce</artifactId>
                <version>${version.hbase}</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
    
                <!-- 分离资源文件 -->
                <plugin>
                    <artifactId>maven-resources-plugin</artifactId>
                    <executions>
                        <execution>
                            <id>copy-resources</id>
                            <phase>package</phase>
                            <goals>
                                <goal>copy-resources</goal>
                            </goals>
                            <configuration>
                                <resources>
                                    <resource>
                                        <directory>src/main/resources</directory>
                                    </resource>
                                </resources>
                                <outputDirectory>${project.build.directory}/conf</outputDirectory>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-jar-plugin</artifactId>
                    <configuration>
                        <archive>
                            <manifestEntries>
                                <Class-Path>../conf/</Class-Path>
                            </manifestEntries>
                            <manifest>
                                <addClasspath>true</addClasspath>
                                <classpathPrefix>../lib/</classpathPrefix>
                                <mainClass>com.css.bigdata.dataCompare.HBaseCompare</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
    
                <plugin>
                    <!--这个插件就是把依赖的jar包复制出来放到编译后的target/lib目录,并且在打包时候排除内部依赖 -->
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-dependency-plugin</artifactId>
                    <executions>
                        <execution>
                            <id>copy-dependencies</id>
                            <phase>prepare-package</phase>
                            <goals>
                                <goal>copy-dependencies</goal>
                            </goals>
                            <configuration>
                                <outputDirectory>${project.build.directory}/lib</outputDirectory>
                                <overWriteReleases>false</overWriteReleases>
                                <overWriteSnapshots>false</overWriteSnapshots>
                                <overWriteIfNewer>true</overWriteIfNewer>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    
    

    HBaseUtil类

    package com.css.bigdata.dataCompare.hbase;
    
    import com.css.bigdata.dataCompare.Constant;
    import com.css.bigdata.dataCompare.util.KerberosCheckUtil;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.security.User;
    import org.apache.hadoop.security.UserGroupInformation;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.io.IOException;
    public class HBaseUtil {
    
        public static Logger logger = LoggerFactory.getLogger(HBaseUtil.class);
        public static Configuration getHbaseConfiguration(String cluster){
            Configuration hbaseConf = HBaseConfiguration.create();
            //调整部分配置
            String hbaseIp = cluster;
            hbaseConf.set("hbase.zookeeper.quorum", hbaseIp + ":2181");
            hbaseConf.set("hbase.master", hbaseIp+":60000");
            //避免超时
            hbaseConf.set("hbase.rpc.timeout", "10000");//10s
            hbaseConf.set("hbase.client.retries.number", "2");
            hbaseConf.set("hbase.client.operation.timeout", "10000");
            return hbaseConf;
        }
    
        public static void kerberosLogin(Configuration hbConf){
            //kerbose
            hbConf.set("hadoop.security.authentication", "Kerberos");
            hbConf.set("hbase.security.authentication", "kerberos");
            hbConf.set("hbase.master.kerberos.principal", "hbase/_HOST@CVBG.COM");
            hbConf.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@CVBG.COM");
            System.setProperty("javax.security.auth.useSubjectCredOnly", "false");
            System.setProperty("java.security.krb5.conf", KerberosCheckUtil.getKrb5Conf());
            try{
                UserGroupInformation.setConfiguration(hbConf);
                if (UserGroupInformation.isLoginKeytabBased() && UserGroupInformation.getLoginUser().getUserName().equals(KerberosCheckUtil.principal)) {
                    logger.info("hbase:" + hbConf.get("hbase.master")+ ",user [{}] is login already!",KerberosCheckUtil.principal);
                }else {
                    UserGroupInformation.loginUserFromKeytab(KerberosCheckUtil.principal, KerberosCheckUtil.getKeyTabFile());
                    logger.info("hbase:" + hbConf.get("hbase.master") + ",user [{}] login successed!",KerberosCheckUtil.principal);
                }
            }catch (IOException e){
                e.printStackTrace();
                logger.error("kerbose登录报错," + KerberosCheckUtil.getKeyTabFile());
                System.exit(1);
            }
        }
        public static User getAuthenticatedUser(){
            User loginedUser = null;
            try {
                logger.info("=====put the logined userinfomation to user====");
                loginedUser = User.create(UserGroupInformation.getLoginUser());
            } catch (IOException e) {
                logger.error("===fialed put the logined userinfomation to user===",e);
            }
            return loginedUser;
        }
    }
    
    

    KerberosCheckUtil类

    package com.css.bigdata.dataCompare.util;
    
    public class KerberosCheckUtil {
    	//主体
    	public static String principal="dw_hbkal@CVBG.COM";
    	//秘钥文件
    	public static String keyTabFileName="dw_hbkal.tab";
    	//默认配置文件
    	public static String krb5Conf= "krb5.conf";
    
    	public static String getKeyTabFile() {
    		String runPath = KerberosCheckUtil.class.getResource("/").getPath();
    		return runPath + keyTabFileName;
    		//return "file:///root/przhang/dw_hbkal.keytab";
    	}
    
        public static String getKrb5Conf() {
    		String runPath = KerberosCheckUtil.class.getResource("/").getPath();
    		return runPath + krb5Conf;
    		//return "fie:///root/przhang/krb5.conf";
        }
    
    }
    

    KerberosTableInputFormat类

    该类直接拷贝了org.apache.hadoop.hbase.mapreduce.TableInputFormat类的代码,作了两处修改:1.在setConf方法中进行了kerberos认证,并获取认证的用户;2.在创建hbase连接的地方,将经过认证的user,加入到connection中,然后使用这个connection即可对hbase进行读写操作

    package com.css.bigdata.dataCompare.hbase;
    import java.io.IOException;
    import java.util.Collections;
    import java.util.List;
    import java.util.Locale;
    
    import org.apache.hadoop.conf.Configurable;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.security.User;
    import org.apache.yetus.audience.InterfaceAudience;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.RegionLocator;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.hbase.util.Pair;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.util.StringUtils;
    
    /**
     * Convert HBase tabular data into a format that is consumable by Map/Reduce.
     */
    @InterfaceAudience.Public
    public class KerberosTableInputFormat extends TableInputFormatBase
            implements Configurable {
    
        @SuppressWarnings("hiding")
        private static final Logger LOG = LoggerFactory.getLogger(KerberosTableInputFormat.class);
    
        /** Job parameter that specifies the input table. */
        public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
        /**
         * If specified, use start keys of this table to split.
         * This is useful when you are preparing data for bulkload.
         */
        private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
        /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
         * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
         */
        public static final String SCAN = "hbase.mapreduce.scan";
        /** Scan start row */
        public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
        /** Scan stop row */
        public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
        /** Column Family to Scan */
        public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
        /** Space delimited list of columns and column families to scan. */
        public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
        /** The timestamp used to filter columns with a specific timestamp. */
        public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
        /** The starting timestamp used to filter columns with a specific range of versions. */
        public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
        /** The ending timestamp used to filter columns with a specific range of versions. */
        public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
        /** The maximum number of version to return. */
        public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
        /** Set to false to disable server-side caching of blocks for this scan. */
        public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
        /** The number of rows for caching that will be passed to scanners. */
        public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
        /** Set the maximum number of values to return for each call to next(). */
        public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
        /** Specify if we have to shuffle the map tasks. */
        public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";
    
        /** The configuration. */
        private Configuration conf = null;
    
        /** The kerberos authenticated user*/
        private User user;
    
        /**
         * Returns the current configuration.
         *
         * @return The current configuration.
         * @see org.apache.hadoop.conf.Configurable#getConf()
         */
        @Override
        public Configuration getConf() {
            return conf;
        }
    
        /**
         * Sets the configuration. This is used to set the details for the table to
         * be scanned.
         *
         * @param configuration  The configuration to set.
         * @see org.apache.hadoop.conf.Configurable#setConf(
         *   org.apache.hadoop.conf.Configuration)
         */
        @Override
        @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
                justification="Intentional")
        public void setConf(Configuration configuration) {
            this.conf = configuration;
            //=========get kerberos authentication before create hbase connection==========
    
            HBaseUtil.kerberosLogin(conf);
            user = HBaseUtil.getAuthenticatedUser();
            Scan scan = null;
    
            if (conf.get(SCAN) != null) {
                try {
                    scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
                } catch (IOException e) {
                    LOG.error("An error occurred.", e);
                }
            } else {
                try {
                    scan = createScanFromConfiguration(conf);
                } catch (Exception e) {
                    LOG.error(StringUtils.stringifyException(e));
                }
            }
    
            setScan(scan);
        }
    
        /**
         * Sets up a {@link Scan} instance, applying settings from the configuration property
         * constants defined in {@code TableInputFormat}.  This allows specifying things such as:
         * <ul>
         *   <li>start and stop rows</li>
         *   <li>column qualifiers or families</li>
         *   <li>timestamps or timerange</li>
         *   <li>scanner caching and batch size</li>
         * </ul>
         */
        public static Scan createScanFromConfiguration(Configuration conf) throws IOException {
            Scan scan = new Scan();
    
            if (conf.get(SCAN_ROW_START) != null) {
                scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
            }
    
            if (conf.get(SCAN_ROW_STOP) != null) {
                scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
            }
    
            if (conf.get(SCAN_COLUMNS) != null) {
                addColumns(scan, conf.get(SCAN_COLUMNS));
            }
    
            for (String columnFamily : conf.getTrimmedStrings(SCAN_COLUMN_FAMILY)) {
                scan.addFamily(Bytes.toBytes(columnFamily));
            }
    
            if (conf.get(SCAN_TIMESTAMP) != null) {
                scan.setTimestamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
            }
    
            if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
                scan.setTimeRange(
                        Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
                        Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
            }
    
            if (conf.get(SCAN_MAXVERSIONS) != null) {
                scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
            }
    
            if (conf.get(SCAN_CACHEDROWS) != null) {
                scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
            }
    
            if (conf.get(SCAN_BATCHSIZE) != null) {
                scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
            }
    
            // false by default, full table scans generate too much BC churn
            scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
    
            return scan;
        }
    
        @Override
        protected void initialize(JobContext context) throws IOException {
            // Do we have to worry about mis-matches between the Configuration from setConf and the one
            // in this context?
            TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
            try {
                //====================add authenticated user ===================
                initializeTable(ConnectionFactory.createConnection(new Configuration(conf),user), tableName);
            } catch (Exception e) {
                LOG.error(StringUtils.stringifyException(e));
            }
        }
    
        /**
         * Parses a combined family and qualifier and adds either both or just the
         * family in case there is no qualifier. This assumes the older colon
         * divided notation, e.g. "family:qualifier".
         *
         * @param scan The Scan to update.
         * @param familyAndQualifier family and qualifier
         * @throws IllegalArgumentException When familyAndQualifier is invalid.
         */
        private static void addColumn(Scan scan, byte[] familyAndQualifier) {
            byte [][] fq = CellUtil.parseColumn(familyAndQualifier);
            if (fq.length == 1) {
                scan.addFamily(fq[0]);
            } else if (fq.length == 2) {
                scan.addColumn(fq[0], fq[1]);
            } else {
                throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
            }
        }
    
        /**
         * Adds an array of columns specified using old format, family:qualifier.
         * <p>
         * Overrides previous calls to {@link Scan#addColumn(byte[], byte[])}for any families in the
         * input.
         *
         * @param scan The Scan to update.
         * @param columns array of columns, formatted as <code>family:qualifier</code>
         * @see Scan#addColumn(byte[], byte[])
         */
        public static void addColumns(Scan scan, byte [][] columns) {
            for (byte[] column : columns) {
                addColumn(scan, column);
            }
        }
    
        /**
         * Calculates the splits that will serve as input for the map tasks. The
         * number of splits matches the number of regions in a table. Splits are shuffled if
         * required.
         * @param context  The current job context.
         * @return The list of input splits.
         * @throws IOException When creating the list of splits fails.
         * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
         *   org.apache.hadoop.mapreduce.JobContext)
         */
        @Override
        public List<InputSplit> getSplits(JobContext context) throws IOException {
            List<InputSplit> splits = super.getSplits(context);
            if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase(Locale.ROOT))) {
                Collections.shuffle(splits);
            }
            return splits;
        }
    
        /**
         * Convenience method to parse a string representation of an array of column specifiers.
         *
         * @param scan The Scan to update.
         * @param columns  The columns to parse.
         */
        private static void addColumns(Scan scan, String columns) {
            String[] cols = columns.split(" ");
            for (String col : cols) {
                addColumn(scan, Bytes.toBytes(col));
            }
        }
    
        @Override
        protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
            if (conf.get(SPLIT_TABLE) != null) {
                TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE));
                //====================add authenticated user ===================
                try (Connection conn = ConnectionFactory.createConnection(getConf(),user)) {
                    try (RegionLocator rl = conn.getRegionLocator(splitTableName)) {
                        return rl.getStartEndKeys();
                    }
                }
            }
    
            return super.getStartEndKeys();
        }
    
        /**
         * Sets split table in map-reduce job.
         */
        public static void configureSplitTable(Job job, TableName tableName) {
            job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString());
        }
    }
    

    主程序示例类

    package com.css.bigdata.dataCompare;
    import com.css.bigdata.dataCompare.hbase.HBaseUtil;
    import com.css.bigdata.dataCompare.hbase.KerberosTableInputFormat;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.sql.SparkSession;
    import scala.Tuple2;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class HBaseCompare {
    
        private static Configuration getKerberosLoginConf(String cluster){
            Configuration conf = HBaseUtil.getHbaseConfiguration(cluster);
            //HBaseUtil.kerberosLogin(conf);
            return conf;
        }
    
        //获取hbase数据并转换
    
        private static JavaPairRDD<String, Map<String,String>> getTableDataRDD(Configuration hconf,String tableName, JavaSparkContext sc) throws IOException {
            hconf.set(KerberosTableInputFormat.INPUT_TABLE,tableName);
            //添加scan
            String scanToString = TableMapReduceUtil.convertScanToString(new Scan());
            hconf.set(KerberosTableInputFormat.SCAN, scanToString);
            //hbase数据转化为RDD
    
            JavaPairRDD<ImmutableBytesWritable, Result> dataRDD = sc.newAPIHadoopRDD(hconf,KerberosTableInputFormat.class,ImmutableBytesWritable.class,Result.class);
            //hbase的Result对象不支持序列化
            JavaPairRDD<String, Map<String,String>> dataRowsRDD = dataRDD.mapToPair(new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, Map<String,String>>() {
                @Override
                public Tuple2<String, Map<String,String>> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
                    Result result =  immutableBytesWritableResultTuple2._2;
                    HashMap<String,String> resultMap = new HashMap<String, String>();
                    for(Cell cell : result.rawCells()) {
                        resultMap.put(new String(CellUtil.cloneQualifier(cell)).toLowerCase(), new String(CellUtil.cloneValue(cell)));
                    }
                    return new Tuple2<>(Bytes.toString(result.getRow()),resultMap);
                }
            });
            return dataRowsRDD;
        }
    
        public static void main(String[] args) {
            String ip = args[0];
            String table = args[1];
    
            //SparkSession session = SparkSession.builder().appName("hbase example").master("local").getOrCreate();
            SparkSession session = SparkSession.builder().appName("hbase example").getOrCreate();
            JavaSparkContext sc = JavaSparkContext.fromSparkContext(session.sparkContext());
            Configuration srcConf = getKerberosLoginConf(ip);
            try{
                JavaPairRDD<String, Map<String,String>> srcRowsRDD = getTableDataRDD(srcConf,table,sc);
                //使用数据
                //...
            } catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    

    打包,提交yarn集群执行

    打包时,依赖打入到lib目录,kerberos的配置文件krb5.conf以及kerberos登录的秘钥文件dw_hbkal.tab文件打包到conf中,程序本身打成jar包放入bin目录,然后以yarn-client模式提交任务

    spark-submit --keytab ../conf/kerberos/dw_hbkal.keytab --principal dw_hbkal@CVBG.COM --files ../conf/kerberos/dw_hbkal.keytab,../conf/kerberos/krb5.conf --master yarn --jars ../lib/hbase-client-2.1.0-cdh6.2.1.jar,../lib/hbase-server-2.1.0-cdh6.2.1.jar,../lib/hbase-mapreduce-2.1.0-cdh6.2.1.jar --class com.css.bigdata.dataCompare.HBaseCompare data-compare-1.0-SNAPSHOT.jar 172.xxx.xxx.xxx testtable
    

    记录坑

    1. 未使用自定义的KerberosTableInputFormat的类,在主程序类HBaseCompare中的getKerberosLoginConf方法中进行了kerberos认证,在本地IDEA中以local模式运行时可以正常执行,但是当提交到yarn集群时,执行失败,报错提示executor无法访问hbase集群,查了好久,突然意识到,主程序类中非RDD操作相关的代码是在driver端执行的,相当于在driver端进行了认证,而executor执行时并没有进行认证,后来找到了这篇博客有kerberos认证hbase在spark环境下的使用,重写了KerberosTableInputFormat类,并在该类中进行了kerberos认证。

    2. 解决了上述问题后,考虑到应用的jar包会被分发到各个executor节点中,因此将dw_hbkal.keytab、krb5.conf文件打到了jar包中,然后在代码KerberosCheckUtil中返回文件路径,然而提交后,程序一直提示找不到文件。。。于是又尝试将这两个文件在集群上各个节点存放了一份,并在KerberosCheckUtil中返回了文件的绝对路径,然而程序运行时依旧提示找不到文件。。

    3. 查看spark-submit命令,发现有--files参数,并说明通过该参数提交的文件会被分发到各个executor节点的运行内存中,于是果断试验一把,jar中不打入kerberos文件,然后在spark-submit提交时,加上了--files参数,终于程序正常运行了。反思了下,任务是在yarn容器中运行的,实际路径并不知道是什么样的,写入绝对路径或者文件放入jar中这些方式,kerberos认证时并不能找到文件,而通过spark-submit --files选项,spark自身已经解决了这些问题,保证可以在内存中读到这些文件,不知道是不是可以这样理解?

  • 相关阅读:
    java并发之CopyOnWriteArraySet
    MySQL 锁
    MySQL 汇总
    MySQL 删除重复数据
    插入排序-PHP
    选择排序-PHP
    快速排序-PHP
    一文轻松读懂微服务、集群、分布式的概念和区别一文轻松读懂微服务、集群、分布式的概念和区别
    k8s与docker简介
    冒泡排序-PHP
  • 原文地址:https://www.cnblogs.com/darange/p/14087427.html
Copyright © 2011-2022 走看看