zoukankan      html  css  js  c++  java
  • Kafka的安全认证机制SASL/PLAINTEXT

    一.背景

    kafka提供了多种安全认证机制,主要分为SSL和SASL2大类。其中SASL/PLAIN是基于账号密码的认证方式,比较常用。最近做了个kafka的鉴权,发现官网上讲的不是很清楚,网上各种博客倒是很多,但是良莠不齐,巨多坑。经过一天的研究,终于搞定了,特在此记录下。

    二.环境

    操作系统:linux
    kafka版本:kafka_2.12-0.11.0.1
    zookeeper版本:zookeeper-3.5.1-alpha

    三.认证步骤

    3.1.Zookeeper配置和启动

    1.为zookeeper添加SASL支持,在配置文件zoo.cfg添加

    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    requireClientAuthScheme=sasl
    jaasLoginRenew=3600000
    

    2.新建zk_server_jaas.conf文件,为Zookeeper添加账号认证信息
    这个文件你放在哪里随意,只要后面zkEnv配置正确的路径就好了。我是放在/home路径下。zk_server_jaas.conf文件的内容如下

    Server {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="cluster"
        password="clusterpasswd"
        user_kafka="kafkapasswd";
    };
    

    username和paasword是zk集群之间的认证密码。
    user_kafka="kafkapasswd"定义了一个用户"kafka",密码是"kafkapasswd",本次测试用户是kafka broker。
    3.导入kafka的相关jar
    由上一步可发现,认证方式使用的是Kafka的认证类org.apache.kafka.common.security.plain.PlainLoginModule。因此zk需要依赖几个jar包。
    在/home下新建zk_sasl_dependency目录,从kafka/lib目录下复制以下几个jar包到该目录下。根据kafka版本不同,几个jar包的版本可能不一样

    kafka-clients-0.11.0.1.jar
    lz4-1.3.0.jar
    slf4j-api-1.7.25.jar
    slf4j-log4j12-1.7.25.jar
    snappy-java-1.1.2.6.jar
    

    4.修改zkEnv.sh
    在zkEnv.sh添加

    for i in /home/zk_sasl_dependency/*.jar; 
    do 
        CLASSPATH="$i:$CLASSPATH"
    done
    SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/home/zk_server_jaas.conf "
    

    关于这一步,网上的配置五花八门,但是原理都是jar包导入和认证信息配置。
    在zk启动的时候导入/home/zk_sasl_dependency/的jar包,SERVER_JVMFLAGS配置jvm参数,导入zk的sasl认证信息。
    5.启动zk服务端
    执行./zkServer.sh start启动zk。如果启动异常查看日志排查问题。

    3.2kafka配置和启动

    1.新建kafka_server_jaas.conf,为kafka添加认证信息

    KafkaServer {
     org.apache.kafka.common.security.plain.PlainLoginModule required
     username="cluster"
     password="cluster"
     user_cluster=“clusterpasswd”
     user_kafka="kafkapasswd" ;
    };
    Client{
     org.apache.kafka.common.security.plain.PlainLoginModule required  
     username="kafka"  
     password="kafkapasswd";  
    };
    

    KafkaServer,第一行指定了认证方法为PLAIN,usernam和password是kafka的多个broker之间进行认证的账号密码。
    user_kafka="kafkapasswd"设置了用户kafka,密码为kafkapswd,用于客户端的生产者和消费者连接认证。
    网上的说法是 Client,是kafka作为用户使用zk的认证信息,这里的username和password一定要和zk_server_jaas.conf的配置对的上。
    但是我试验发现 user_cluster=“clusterpasswd”才是真正进行认证的信息,这个Client好像一点用没有,删掉也可以正常启动server,kafka服务也是正常的,费解啊!

    2.在kafka的配置文件开启SASL认证
    在server.properties添加如下信息

    listeners=SASL_PLAINTEXT://(IP):9092
    security.inter.broker.protocol=SASL_PLAINTEXT
    sasl.mechanism.inter.broker.protocol=PLAIN 
    sasl.enabled.mechanisms=PLAIN
    allow.everyone.if.no.acl.found=true
    

    3.在server启动脚本JVM参数
    我是直接在

    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    

    添加了认证信息,修改后为

    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/kafka_server_jaas.conf"
    

    4.启动kafka服务端

    ./kafka-server-start.sh ../config/server.properties
    

    kafka服务端正常启动后,应该会有类似下面这行的日志信息,说明认证功能开启成功

    Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint((IP),9092,ListenerName(SASL_PLAINTEXT),SASL_PLAINTEXT) (kafka.utils.ZkUtils)
    

    3.3kafka的SASL认证功能认证和使用

    1.使用kafka脚本认证

    我们使用kafka自带的脚本进行认证。
    1.新建kafka_client_jaas.conf,为客户端添加认证信息
    在/home下新建kafka_client_jaas.conf,添加以下信息

    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="kafka"
      password="kafkapasswd";
    };
    

    2.修改客户端配置信息
    修改producer.properties和consumer.properties,添加认证机制

    security.protocol=SASL_PLAINTEXT 
    sasl.mechanism=PLAIN 
    

    3.修改客户端启动脚本
    修改kafka-console-producer.sh,配置认证文件kafka_client_jaas.conf,将

    export KAFKA_HEAP_OPTS="-Xmx512M"
    ```修改为
    

    export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/home/kafka_client_jaas.conf"

    kafka-console-consumer.sh的修改类似。
    4.客户端启动并认证
    启动consumer
    

    ./bin/kafka-console-consumer.sh --bootstrap-server (IP):9092 --topic test --from-beginning --consumer.config config/consumer.properties

    启动producer
    

    ./bin/kafka-console-producer.sh --broker-list (IP):9092 --topic test --producer.config configoducer.properties

    producer端发送消息,consumer端成功接收到消息。
    ##2.Java客户端认证
    ```java
    package com.zte.sdn.oscp.jms.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.junit.Test;
    
    import java.util.Collections;
    import java.util.Properties;
    
    public class KafkaTest {
    
        @Test
        public void testProduct() throws Exception {
            System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");
    
            Properties props = new Properties();
            props.put("bootstrap.servers", "IP:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            props.put("security.protocol", "SASL_PLAINTEXT");
            props.put("sasl.mechanism", "PLAIN");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            while (true){
    			long startTime = System.currentTimeMillis();
    			for (int i = 0; i < 100; i++) {
    				producer.send(new ProducerRecord<>("kafkatest", Integer.toString(i), Integer.toString(i)));
    			}
    			System.out.println(System.currentTimeMillis()-startTime);
    			Thread.sleep(5000);
    		}
        }
    
        @Test
        public void testConsumer() throws Exception {
            System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");
    
            Properties props = new Properties();
            props.put("bootstrap.servers", "(IP):9092");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("group.id", "kafka_test_group");
            props.put("session.timeout.ms", "6000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            props.put("security.protocol", "SASL_PLAINTEXT");
            props.put("sasl.mechanism", "PLAIN");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList("kafkatest"));
            while (true) {
                long startTime = System.currentTimeMillis();
                ConsumerRecords<String, String> records = consumer.poll(1000);
                System.out.println(System.currentTimeMillis() - startTime);
                System.out.println("recieve message number is " + records.count());
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s, partition = %d %n",
                            record.offset(),
                            record.key(),
                            record.value(),
                            record.partition());
                }
            }
        }
    }
    

    3.4客户端认证时延问题

    认证时发现生产者和消费者和kafka的broker建立连接都有一定时延。在生产者的日志发现时延主要发生在

    2018-12-17 10:55:46[DEBUG][kafka-producer-network-thread | producer-1]-NetworkClient.java: 762 - Initiating connection to node (IP):9092 (id: 0 rack: null)
    2018-12-17 10:55:50[DEBUG][kafka-producer-network-thread | producer-1]-SaslClientAuthenticator.java: 209 - Set SASL client state to SEND_HANDSHAKE_REQUEST
    

    难道客户端连接服务端时,认证时间需要这么长??

  • 相关阅读:
    理解javascript中的Array类型
    解决EF 4.0 中数据缓存机制
    vim学习之旅01-文本搜索并高亮显示
    Quartz.Net 学习之路02 初探Quartz.Net
    Quartz.Net 学习之路01 安装Quartz.Net
    EasyUI这个框架用了好久了,总结一下遇到的问题和解决方法
    记录剪切板
    如何将Unicode字符转换成简体字
    ass字幕转换成文本文件
    Change WORDS
  • 原文地址:https://www.cnblogs.com/ilovena/p/10123516.html
Copyright © 2011-2022 走看看