zoukankan      html  css  js  c++  java
  • kafka(2.2.1)(kerberos+LDAP+Sentry)访问使用

    kafka(2.2.1)(kerberos+LDAP+Sentry)访问使用

    一.访问的kafka的一些配置(已集成kerberos )

    由于kafka集成了kerberos 所以需要通过kerberos的认证

    认证方式有两种

    • 1.通过配置文件
    • 2.通过keytab文件

    我们这里采用第一种

    首先先在目录/usr/local/kafka_client下创建两个文件一个是client.properties,一个是jaas.conf

    在client.properties文件里面写入

    security.protocol=SASL_PLAINTEXT
    sasl.kerberos.service.name=kafka
    group.id=testgroup
    

    在jaas.conf写入

    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useTicketCache=true
    renewTicket=true
    serviceName="kafka";
    };
    

    之后在shell命令行执行一下命令来配置环境变量(这样只针对当前进程有效)

    [root@cdh-datanode03 kafka_client]# export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_client/jaas.conf"
    [root@cdh-datanode03 kafka_client]# echo $KAFKA_OPTS
    

    在执行kinit命令登陆kerberos用户

    二.Shell 命令行使用Kafka(已集成sentry)

    1.创建kafka topic

    [root@cdh-datanode03 kafka_client]# kafka-topics --create --zookeeper cdh-master01:2181 --replication-factor 1 --partitions 1 --topic testTopic
    

    2.查看Topic列表

    [root@cdh-datanode03 kafka_client]# kafka-topics --zookeeper cdh-master01:2181 --list
    

    3.删除Topic

    [root@cdh-datanode03 kafka_client]# kafka-topics --delete --zookeeper cdh-master01:2181 --topic testTopic
    

    4.向Topic生产数据(需要权限)

    [root@cdh-datanode03 kafka_client]# kafka-console-producer --broker-list cdh-datanode03:9092,cdh-datanode04:9092 --topic testTopic --producer.config /usr/local/kafka_client/client.properties
    

    5.消费Topic数据(需要权限)

    [root@cdh-datanode03 kafka_client]# kafka-console-consumer --topic testTopic --from-beginning --bootstrap-server cdh-datanode03:9092,cdh-datanode04:9092,cdh-datanode05:9092 --consumer.config /usr/local/kafka_client/client.properties
    

    此时会报以下错误 表示没有权限向testTopicTopic 写入数据此时我们需要给我们kinit登陆的用户赋予权限

    ERROR internals.ErrorLoggingCallback: Error when sending message to topic testTopic with key: null, value: 3 bytes with error:
    org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [testTopic]
    
    

    我们以fayson用户为例 它属于user组(id+用户名 查看组)

    • 1.我们需要首先创建一个kafka的principle
    • 2.我们给user用户组赋权可以写入数据到testTopic,注意需要使用管理员kafka用户登录Kerberos才能进行操作
    [root@cdh-datanode03 kafka_client]# klist
    Ticket cache: FILE:/tmp/krb5cc_0
    Default principal: kafka@GREE.IO
    
    Valid starting       Expires              Service principal
    09/11/2019 20:47:25  09/12/2019 20:47:25  krbtgt/GREE.IO@GREE.IO
    	renew until 09/18/2019 20:47:25
    
    • 3.创建一个role
    [root@cdh-datanode03 kafka_client]#  kafka-sentry -cr -r kafka_role
    
    • 4.给kafka_role赋予写入testTopic权限
    [root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=testTopic->action=write"
    [root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=testTopic->action=describe"
    
    • 5.将角色加入到user组下面
    [root@cdh-datanode03 kafka_client]#  kafka-sentry -arg -r kafka_role -g user
    
    • 6.以fayson用户登录(输入密码)
    [root@cdh-datanode03 kafka_client]# kinit fayson
    

    之后以此用户写入testTopic 就不会报权限问题了

    此时我们还需要给 fayson 用户赋予读取testTopic的权限,所以需要给kafka_role赋予读取testtopic的权限

    • 1.我们在上面完成的基础之上需要对kafka_role角色赋予读取testTopic 的权限
    • 2.执行以下命令需要使用kafka 用户
    [root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "CONSUMERGROUP=testgroup->action=read"
    [root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "CONSUMERGROUP=testgroup->action=describe"
    [root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=zhcTestTopic->action=read"
    

    三.代码访问(java)

    需要创建consumer.properties,producer.properties,jaas.conf文件 还要引入krb5.conf文件

    producer.properties文件内容

    bootstrap.servers=cdh-datanode03:9092,cdh-datanode04:9092,cdh-datanode05:9092
    #实现了Serializer接口的序列化类。用于告诉kafka如何序列化key
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    #告诉kafka如何序列化value
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    acks=1
    #访问kerberos的kafka client 配置
    security.protocol=SASL_PLAINTEXT
    sasl.kerberos.service.name=kafka
    

    consumer.properties文件内容

    bootstrap.servers=cdh-datanode04:9092
    group.id=testgroup1
    enable.auto.commit=true
    session.timeout.ms=30000
    auto.offset.reset=earliest
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    security.protocol=SASL_PLAINTEXT
    sasl.kerberos.service.name=kafka
    

    jaas.conf文件内容

    KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      doNotPrompt=true
      useKeyTab=true
      storeKey=true
      renewTicket=true
      keyTab="D:/cdh/kafka/src/main/kerberos/gree1.keytab"
      principal="gree1@GREE.IO";
    };
    
    
    Client {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      storeKey=true
      keyTab="D:/cdh/kafka/src/main/kerberos/gree1.keytab"
      principal="gree1@GREE.IO";
    };
    
    • 1.pom.xml
        <dependency>
              <groupId>org.apache.kafka</groupId>
              <artifactId>kafka-clients</artifactId>
              <version>2.2.1-cdh6.3.0</version>
          </dependency>
    
    • 2.producer
    package producer;
    
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.io.IOException;
    import java.util.Properties;
    
    class MyProducer {
        private static final MyProducer Instance = new MyProducer();
    
        private MyProducer() {
        }
    
        public static MyProducer getInstance() {
            return Instance;
        }
    
        public int messageNo = 1;
    
        /**
         * 获得一个Kafka生产者实例
         *
         * @return
         */
        public KafkaProducer Produce() {
            System.setProperty("java.security.auth.login.config",             "D:\cdh\kafka\src\main\kerberos\jaas.conf");
            System.setProperty("java.security.krb5.conf", "D:\cdh\kafka\src\main\kerberos\krb5.conf");
           
            Properties props = new Properties();
            try {
                props.load(this.getClass().getResourceAsStream("/producer.properties"));
            } catch (IOException e) {
                e.printStackTrace();
            }
            KafkaProducer producer = new KafkaProducer(props);
            return producer;
        }
    }
    
    public class ProducerStarter implements Runnable {
        private int threadIndex;
    
        public ProducerStarter(int threadIndex) {
            this.threadIndex = threadIndex;
        }
    
        /**
         * 生产数据
         */
    
        public void run() {
            MyProducer pro = MyProducer.getInstance();
            KafkaProducer prod = pro.Produce();
            String topic = "testTopic";
            int i = 0;
            while (1 == 1) {
                final int index = i++; 
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                prod.send(new ProducerRecord<String, String>(topic,String.valueOf(index), String.valueOf(i)), new Callback() {
    
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (e != null) {
                            e.printStackTrace();
                        }
                        System.out.println("message send to partition " + recordMetadata.partition() + /*value*/ ": hello word " + index);
                    }
            });
                prod.flush();
                //sleep 1min
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 启动200个线程,生产
         *
         * @param args
         */
        public static void main(String args[]) {
    
            for (int i = 0; i < 1; i++) {
                System.out.println("启动线程:" + i);
                Thread thread = new Thread(new ProducerStarter(i));
                thread.start();
            }
        }
    }
    
    
    • 3.Consumer
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    
    import java.util.Arrays;
    
    public class ConsumerStarter {
        public static void main(String[] args) throws InterruptedException {
    
            KafkaConsumer consumer = Consumer.getInstance().Consume();
            consumer.subscribe(Arrays.asList("testTopic"));
            //消费并打印消费结果
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord record: records) {
                    System.out.printf("offset = %d, key = %s, value= %s%n", record.offset(), record.key(), record.value());
                }
                Thread.sleep(1000);
            }
        }
    }
    
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.io.IOException;
    import java.util.Properties;
    
    /**
     * Created by 260212 on 2018/4/12.
     * Author:HarSenZhao
     * 描述:
     */
    class Consumer {
        private static final Consumer Instance=new Consumer();
        private Consumer(){}
        public static Consumer getInstance(){
            return Instance;
        }
    
        /**
         * 获得一个Kafka消费者
         * kafka-clients版本要高于0.9.0.1,否则会取出为null
         * @return
         */
        public KafkaConsumer Consume (){
            System.setProperty("java.security.auth.login.config",  "D:\cdh\kafka\src\main\kerberos\jaas.conf");
            System.setProperty("java.security.krb5.conf", "D:\cdh\kafka\src\main\kerberos\krb5.conf");
            Properties props=new Properties();
            try {
                props.load(this.getClass().getResourceAsStream("/consumer.properties"));
            } catch (IOException e) {
                e.printStackTrace();
            }
            KafkaConsumer consumerSelf=new KafkaConsumer<String,String>(props);
            return consumerSelf;
        }
    }
    
    
    
  • 相关阅读:
    类继承
    抽象基类 纯虚函数
    虚函数
    Java网络通信
    Java补补补
    刷LeetCode吧
    贝叶斯网络的
    vscode添加vue模板
    vue--项目实例
    Java01
  • 原文地址:https://www.cnblogs.com/HarSenZhao/p/11508687.html
Copyright © 2011-2022 走看看