zoukankan      html  css  js  c++  java
  • kerberos环境下spark消费kafka写入到Hbase

    一、准备环境: 创建Kafka Topic和HBase表

    1. 在kerberos环境下创建Kafka Topic

    1.1 因为kafka默认使用的协议为PLAINTEXT,在kerberos环境下需要变更其通信协议: 在${KAFKA_HOME}/config/producer.propertiesconfig/consumer.properties下添加

    security.protocol=SASL_PLAINTEXT

    1.2 在执行前,需要在环境变量中添加KAFKA_OPT选项,否则kafka无法使用keytab:

    export KAFKA_OPTS="$KAFKA_OPTS -Djava.security.auth.login.config=/usr/ndp/current/kafka_broker/conf/kafka_jaas.conf"

    其中kafka_jaas.conf内容如下:

    cat /usr/ndp/current/kafka_broker/conf/kafka_jaas.conf
    
    KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="/etc/security/keytabs/kafka.service.keytab"
    storeKey=true
    useTicketCache=false
    serviceName="kafka"
    principal="kafka/hzadg-mammut-platform3.server.163.org@BDMS.163.COM";
    };
    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useTicketCache=true
    renewTicket=true
    serviceName="kafka";
    };
    Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="/etc/security/keytabs/kafka.service.keytab"
    storeKey=true
    useTicketCache=false
    serviceName="zookeeper"
    principal="kafka/hzadg-mammut-platform3.server.163.org@BDMS.163.COM";
    };

    1.3 创建新的topic:

    bin/kafka-topics.sh --create --zookeeper hzadg-mammut-platform2.server.163.org:2181,hzadg-mammut-platform3.server.163.org:2181 --replication-factor 1 --partitions 1 --topic spark-test

    1.4 创建生产者:

    bin/kafka-console-producer.sh  --broker-list hzadg-mammut-platform2.server.163.org:6667,hzadg-mammut-platform3.server.163.org:6667,hzadg-mammut-platform4.server.163.org:6667 --topic spark-test --producer.config ./config/producer.properties

    1.5 测试消费者:

    bin/kafka-console-consumer.sh --zookeeper hzadg-mammut-platform2.server.163.org:2181,hzadg-mammut-platform3.server.163.org:2181 --bootstrap-server hzadg-mammut-platform2.server.163.org:6667 --topic spark-test --from-beginning --new-consumer  --consumer.config ./config/consumer.properties

    2. 创建HBase表

    2.1 kinit到hbase账号,否则无法创建hbase表

    kinit -kt /etc/security/keytabs/hbase.service.keytab hbase/hzadg-mammut-platform2.server.163.org@BDMS.163.COM
     ./bin/hbase shell
    > create 'recsys_logs', 'f'

    二、编写Spark代码

    编写简单的Spark Java程序,功能为: 从Kafka消费信息,同时将batch内统计的数量写入Hbase中,具体可以参考项目:

    https://github.com/LiShuMing/spark-demos

    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *    http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package com.netease.spark.streaming.hbase;
    
    import com.netease.spark.utils.Consts;
    import com.netease.spark.utils.JConfig;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.HConnection;
    import org.apache.hadoop.hbase.client.HConnectionManager;
    import org.apache.hadoop.hbase.client.HTableInterface;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka010.ConsumerStrategies;
    import org.apache.spark.streaming.kafka010.KafkaUtils;
    import org.apache.spark.streaming.kafka010.LocationStrategies;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Arrays;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.Map;
    import java.util.Set;
    
    public class JavaKafkaToHBaseKerberos {
      private final static Logger LOGGER = LoggerFactory.getLogger(JavaKafkaToHBaseKerberos.class);
    
      private static HConnection connection = null;
      private static HTableInterface table = null;
    
      public static void openHBase(String tablename) throws IOException {
        Configuration conf = HBaseConfiguration.create();
        synchronized (HConnection.class) {
          if (connection == null)
            connection = HConnectionManager.createConnection(conf);
        }
    
        synchronized (HTableInterface.class) {
          if (table == null) {
            table = connection.getTable("recsys_logs");
          }
        }
      }
    
      public static void closeHBase() {
        if (table != null)
          try {
            table.close();
          } catch (IOException e) {
            LOGGER.error("关闭 table 出错", e);
          }
        if (connection != null)
          try {
            connection.close();
          } catch (IOException e) {
            LOGGER.error("关闭 connection 出错", e);
          }
      }
    
      public static void main(String[] args) throws Exception {
        String hbaseTable = JConfig.getInstance().getProperty(Consts.HBASE_TABLE);
        String kafkaBrokers = JConfig.getInstance().getProperty(Consts.KAFKA_BROKERS);
        String kafkaTopics = JConfig.getInstance().getProperty(Consts.KAFKA_TOPICS);
        String kafkaGroup = JConfig.getInstance().getProperty(Consts.KAFKA_GROUP);
    
        // open hbase
        try {
          openHBase(hbaseTable);
        } catch (IOException e) {
          LOGGER.error("建立HBase 连接失败", e);
          System.exit(-1);
        }
    
        SparkConf conf = new SparkConf().setAppName("JavaKafakaToHBase");
        JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
    
        Set<String> topicsSet = new HashSet<>(Arrays.asList(kafkaTopics.split(",")));
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", kafkaBrokers);
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", kafkaGroup);
        kafkaParams.put("auto.offset.reset", "earliest");
        kafkaParams.put("enable.auto.commit", false);
        // 在kerberos环境下,这个配置需要增加
        kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
    
        // Create direct kafka stream with brokers and topics
        final JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topicsSet.toArray(new String[0])), kafkaParams)
            );
    
        JavaDStream<String> lines = stream.map(new Function<ConsumerRecord<String, String>, String>() {
          private static final long serialVersionUID = -1801798365843350169L;
    
          @Override
          public String call(ConsumerRecord<String, String> record) {
            return record.value();
          }
        }).filter(new Function<String, Boolean>() {
          private static final long serialVersionUID = 7786877762996470593L;
    
          @Override
          public Boolean call(String msg) throws Exception {
            return msg.length() > 0;
          }
        });
    
        JavaDStream<Long> nums = lines.count();
    
        nums.foreachRDD(new VoidFunction<JavaRDD<Long>>() {
          private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
    
          @Override
          public void call(JavaRDD<Long> rdd) throws Exception {
            Long num = rdd.take(1).get(0);
            String ts = sdf.format(new Date());
            Put put = new Put(Bytes.toBytes(ts));
            put.add(Bytes.toBytes("f"), Bytes.toBytes("nums"), Bytes.toBytes(num));
            table.put(put);
          }
        });
    
        ssc.start();
        ssc.awaitTermination();
        closeHBase();
      }
    }

    三、 编译并在Yarn环境下运行

    3.1 切到项目路径下,编译项目:

    mvn clean package

    3.2 运行Spark环境

    • 由于executor需要访问kafka,所以需要将Kafka授权过的kerberos用户下发至executor中;
    • 由于集群环境的hdfs也是kerberos加密的,需要通过spark.yarn.keytab/spark.yarn.principal配置可以访问Hdfs/HBase的keytab信息;

    在项目目录下执行如下:

    /usr/ndp/current/spark2_client/bin/spark-submit 
    --files ./kafka_client_jaas.conf,./kafka.service.keytab 
    --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" 
    --driver-java-options "-Djava.security.auth.login.config=./kafka_client_jaas.conf" 
    --conf spark.yarn.keytab=/etc/security/keytabs/hbase.service.keytab 
    --conf spark.yarn.principal=hbase/hzadg-mammut-platform1.server.163.org@BDMS.163.COM 
    --class com.netease.spark.streaming.hbase.JavaKafkaToHBaseKerberos 
    --master yarn  
    --deploy-mode client 
    ./target/spark-demo-0.1.0-jar-with-dependencies.jar  

    其中kafka_client_jaas.conf文件具体内容如下:

    cat kafka_client_jaas.conf
    
    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    renewTicket=true
    keyTab="./kafka.service.keytab"
    storeKey=true
    useTicketCache=false
    serviceName="kafka"
    principal="kafka/hzadg-mammut-platform1.server.163.org@BDMS.163.COM";
    };

    3.2 执行结果

     
     
     
  • 相关阅读:
    这才是世上最全的“软件测试”思维导图!
    Scrum3.0 敏捷开发白皮书
    敏捷软件质量保证的方法与实践
    C# DataGridView 列的显示顺序
    Xamarin.android 重写axml控件
    Xamarin控件使用之GridView
    Sql 的 RAISERROR用法
    Xamarin.Android 怎么定义一个按钮和返回键功能一样回到上一个界面
    Xamarin.android Activity动画切换效果实现
    Xamarin.Android之封装个简单的网络请求类
  • 原文地址:https://www.cnblogs.com/felixzh/p/10558674.html
Copyright © 2011-2022 走看看