zoukankan      html  css  js  c++  java
  • kerberos环境下spark消费kafka写入到Hbase

    一、准备环境: 创建Kafka Topic和HBase表

    1. 在kerberos环境下创建Kafka Topic

    1.1 因为kafka默认使用的协议为PLAINTEXT,在kerberos环境下需要变更其通信协议: 在${KAFKA_HOME}/config/producer.propertiesconfig/consumer.properties下添加

    security.protocol=SASL_PLAINTEXT

    1.2 在执行前,需要在环境变量中添加KAFKA_OPT选项,否则kafka无法使用keytab:

    export KAFKA_OPTS="$KAFKA_OPTS -Djava.security.auth.login.config=/usr/ndp/current/kafka_broker/conf/kafka_jaas.conf"

    其中kafka_jaas.conf内容如下:

    cat /usr/ndp/current/kafka_broker/conf/kafka_jaas.conf
    
    KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="/etc/security/keytabs/kafka.service.keytab"
    storeKey=true
    useTicketCache=false
    serviceName="kafka"
    principal="kafka/hzadg-mammut-platform3.server.163.org@BDMS.163.COM";
    };
    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useTicketCache=true
    renewTicket=true
    serviceName="kafka";
    };
    Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="/etc/security/keytabs/kafka.service.keytab"
    storeKey=true
    useTicketCache=false
    serviceName="zookeeper"
    principal="kafka/hzadg-mammut-platform3.server.163.org@BDMS.163.COM";
    };

    1.3 创建新的topic:

    bin/kafka-topics.sh --create --zookeeper hzadg-mammut-platform2.server.163.org:2181,hzadg-mammut-platform3.server.163.org:2181 --replication-factor 1 --partitions 1 --topic spark-test

    1.4 创建生产者:

    bin/kafka-console-producer.sh  --broker-list hzadg-mammut-platform2.server.163.org:6667,hzadg-mammut-platform3.server.163.org:6667,hzadg-mammut-platform4.server.163.org:6667 --topic spark-test --producer.config ./config/producer.properties

    1.5 测试消费者:

    bin/kafka-console-consumer.sh --zookeeper hzadg-mammut-platform2.server.163.org:2181,hzadg-mammut-platform3.server.163.org:2181 --bootstrap-server hzadg-mammut-platform2.server.163.org:6667 --topic spark-test --from-beginning --new-consumer  --consumer.config ./config/consumer.properties

    2. 创建HBase表

    2.1 kinit到hbase账号,否则无法创建hbase表

    kinit -kt /etc/security/keytabs/hbase.service.keytab hbase/hzadg-mammut-platform2.server.163.org@BDMS.163.COM
     ./bin/hbase shell
    > create 'recsys_logs', 'f'

    二、编写Spark代码

    编写简单的Spark Java程序,功能为: 从Kafka消费信息,同时将batch内统计的数量写入Hbase中,具体可以参考项目:

    https://github.com/LiShuMing/spark-demos

    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *    http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package com.netease.spark.streaming.hbase;
    
    import com.netease.spark.utils.Consts;
    import com.netease.spark.utils.JConfig;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.HConnection;
    import org.apache.hadoop.hbase.client.HConnectionManager;
    import org.apache.hadoop.hbase.client.HTableInterface;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka010.ConsumerStrategies;
    import org.apache.spark.streaming.kafka010.KafkaUtils;
    import org.apache.spark.streaming.kafka010.LocationStrategies;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Arrays;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.Map;
    import java.util.Set;
    
    public class JavaKafkaToHBaseKerberos {
      private final static Logger LOGGER = LoggerFactory.getLogger(JavaKafkaToHBaseKerberos.class);
    
      private static HConnection connection = null;
      private static HTableInterface table = null;
    
      public static void openHBase(String tablename) throws IOException {
        Configuration conf = HBaseConfiguration.create();
        synchronized (HConnection.class) {
          if (connection == null)
            connection = HConnectionManager.createConnection(conf);
        }
    
        synchronized (HTableInterface.class) {
          if (table == null) {
            table = connection.getTable("recsys_logs");
          }
        }
      }
    
      public static void closeHBase() {
        if (table != null)
          try {
            table.close();
          } catch (IOException e) {
            LOGGER.error("关闭 table 出错", e);
          }
        if (connection != null)
          try {
            connection.close();
          } catch (IOException e) {
            LOGGER.error("关闭 connection 出错", e);
          }
      }
    
      public static void main(String[] args) throws Exception {
        String hbaseTable = JConfig.getInstance().getProperty(Consts.HBASE_TABLE);
        String kafkaBrokers = JConfig.getInstance().getProperty(Consts.KAFKA_BROKERS);
        String kafkaTopics = JConfig.getInstance().getProperty(Consts.KAFKA_TOPICS);
        String kafkaGroup = JConfig.getInstance().getProperty(Consts.KAFKA_GROUP);
    
        // open hbase
        try {
          openHBase(hbaseTable);
        } catch (IOException e) {
          LOGGER.error("建立HBase 连接失败", e);
          System.exit(-1);
        }
    
        SparkConf conf = new SparkConf().setAppName("JavaKafakaToHBase");
        JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
    
        Set<String> topicsSet = new HashSet<>(Arrays.asList(kafkaTopics.split(",")));
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", kafkaBrokers);
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", kafkaGroup);
        kafkaParams.put("auto.offset.reset", "earliest");
        kafkaParams.put("enable.auto.commit", false);
        // 在kerberos环境下,这个配置需要增加
        kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
    
        // Create direct kafka stream with brokers and topics
        final JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topicsSet.toArray(new String[0])), kafkaParams)
            );
    
        JavaDStream<String> lines = stream.map(new Function<ConsumerRecord<String, String>, String>() {
          private static final long serialVersionUID = -1801798365843350169L;
    
          @Override
          public String call(ConsumerRecord<String, String> record) {
            return record.value();
          }
        }).filter(new Function<String, Boolean>() {
          private static final long serialVersionUID = 7786877762996470593L;
    
          @Override
          public Boolean call(String msg) throws Exception {
            return msg.length() > 0;
          }
        });
    
        JavaDStream<Long> nums = lines.count();
    
        nums.foreachRDD(new VoidFunction<JavaRDD<Long>>() {
          private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
    
          @Override
          public void call(JavaRDD<Long> rdd) throws Exception {
            Long num = rdd.take(1).get(0);
            String ts = sdf.format(new Date());
            Put put = new Put(Bytes.toBytes(ts));
            put.add(Bytes.toBytes("f"), Bytes.toBytes("nums"), Bytes.toBytes(num));
            table.put(put);
          }
        });
    
        ssc.start();
        ssc.awaitTermination();
        closeHBase();
      }
    }

    三、 编译并在Yarn环境下运行

    3.1 切到项目路径下,编译项目:

    mvn clean package

    3.2 运行Spark环境

    • 由于executor需要访问kafka,所以需要将Kafka授权过的kerberos用户下发至executor中;
    • 由于集群环境的hdfs也是kerberos加密的,需要通过spark.yarn.keytab/spark.yarn.principal配置可以访问Hdfs/HBase的keytab信息;

    在项目目录下执行如下:

    /usr/ndp/current/spark2_client/bin/spark-submit 
    --files ./kafka_client_jaas.conf,./kafka.service.keytab 
    --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" 
    --driver-java-options "-Djava.security.auth.login.config=./kafka_client_jaas.conf" 
    --conf spark.yarn.keytab=/etc/security/keytabs/hbase.service.keytab 
    --conf spark.yarn.principal=hbase/hzadg-mammut-platform1.server.163.org@BDMS.163.COM 
    --class com.netease.spark.streaming.hbase.JavaKafkaToHBaseKerberos 
    --master yarn  
    --deploy-mode client 
    ./target/spark-demo-0.1.0-jar-with-dependencies.jar  

    其中kafka_client_jaas.conf文件具体内容如下:

    cat kafka_client_jaas.conf
    
    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    renewTicket=true
    keyTab="./kafka.service.keytab"
    storeKey=true
    useTicketCache=false
    serviceName="kafka"
    principal="kafka/hzadg-mammut-platform1.server.163.org@BDMS.163.COM";
    };

    3.2 执行结果

     
     
     
  • 相关阅读:
    gc buffer busy/gcs log flush sync与log file sync
    给Oracle年轻的初学者的几点建议
    Android 编程下帧动画在 Activity 启动时自动运行的几种方式
    Android 编程下 Touch 事件的分发和消费机制
    Java 编程下 static 关键字
    Java 编程下 final 关键字
    Android 编程下模拟 HOME 键效果
    Why Are Thread.stop, Thread.suspend, Thread.resume and Runtime.runFinalizersOnExit Deprecated ?
    Extjs4 大型项目目录结构重构
    [转]SQLServer 2008 允许远程连接的配置方法
  • 原文地址:https://www.cnblogs.com/felixzh/p/10558674.html
Copyright © 2011-2022 走看看