zoukankan      html  css  js  c++  java
  • Kafka安全认证SASL/PLAIN

    zookeepe版本: zookeeper-3.4.13, 安装路径/usr/local/zookeeper-3.4.13/

    kafka版本:kafka_2.13-2.6.0.tgz

    一、Zookeeper配置

    安装Zookeeper

    参考: Zookeeper的下载、安装和启动 

    Zookeeper 集群搭建--单机伪分布式集群

    1、从Kafka/lib目录拷贝以下jar到zookeeper的lib目录下

    kafka-clients-2.6.0.jar
    lz4-java-1.7.1.jar
    slf4j-api-1.7.25.jar
    slf4j-log4j12-1.7.25.jar
    snappy-java-1.1.7.3.jar
    

      

    2、zoo.cfg 文件配置

    添加配置

    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    requireClientAuthScheme=sasl
    jaasLoginRenew=3600000
    

      

    3、编写JAAS文件,放在conf文件夹下

    /usr/local/zookeeper-3.4.13/zk_server_jaas.conf

    Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required 
        username="admin" 
        password="admin123456" 
        user_kafka="kafka123456" 
        user_producer="prod123456";
    };
    

     定义了两个用户,一个是kafka,一个是producer, 这些用user_配置出来的用户都可以提供给生产者程序和消费者程序认证使用

       还有两个属性,username和password,是配置Zookeeper节点之间内部认证的用户名和密码。

    各个节点分别启动zookeeper  

    cd /usr/local/zookeeper-3.4.13/bin

    ./zkServer.sh  start

    备注: zookeeper如果是集群的话,每个zookeeper都做相同的配置

    二、Kafka配置

    1、在kafka安装目录config下创建kafka_server_jaas.conf文件

    KafkaServer {
        org.apache.kafka.common.security.plain.PlainLoginModule required
                username="admin"
                password="admin123456"
                user_admin="admin123456"
                user_producer="prod123456"
                user_consumer="cons123456";
    };
    
    
    
    Client {
    org.apache.kafka.common.security.plain.PlainLoginModule required
            username="kafka"
            password="kafka123456";
    };
    

     KafkaServer配置的kafka的账号和密码,Client配置的是Broker到ZK的链接用户名和密码。这里要与前面zookeeper的配置zk_server_jaas.conf中user_kafka的账号和密码保持一致。 

      

    2、修改server.properties

    listeners=SASL_PLAINTEXT://0.0.0.0:9092
    advertised.listeners=SASL_PLAINTEXT://118.xx.xx.101:9092
    security.inter.broker.protocol=SASL_PLAINTEXT
    sasl.enabled.mechanisms=PLAIN
    sasl.mechanism.inter.broker.protocol=PLAIN
    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
    allow.everyone.if.no.acl.found=true
    

      

    3、修改启动脚本

    bin/kafka-server-start.sh

    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
        export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M -Djava.security.auth.login.config=/xxx/kafka/config/kafka_server_jaas.conf "
    fi 

    指定-Djava.security.auth.login.config的路径

     4、启动kafka

    ./kafka-server-start.sh  ../config/server.properties &

    三、SpringBoot整合

    spring boot版本为2.4.10

    1、引入依赖

            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>1.1.7.RELEASE</version>
            </dependency>
    

      

    2、新建kafka_client_jaas.conf

    该文件存放在E:\study\xxstudy\kafkademo\config\路径下

    KafkaClient {
     org.apache.kafka.common.security.plain.PlainLoginModule required
     username="admin"
     password="admin123456";
    }; 

    这里的用户名和密码要和前面kafka使用的账号密码相同,才能有访问权限。

    3、生产者

    public class JaasProducerDemo {
    
        private final static String  TOPIC_NAME = "test5";
    
        static {
            System.setProperty("java.security.auth.login.config", "E:\study\xxstudy\kafkademo\config\kafka_client_jaas.conf");
    
        }
    
        public static void main(String[] args) throws  Exception {
            producerSendWithJaas();
        }
    
        public static void producerSendWithJaas(){
            Properties  properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"118.xx.xx.101:9092");
            properties.put(ProducerConfig.ACKS_CONFIG,"all");
            properties.put(ProducerConfig.RETRIES_CONFIG,"0");
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
            properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
             properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
             properties.put(SaslConfigs.SASL_MECHANISM,"PLAIN");
             
            Producer<String,String> producer = new KafkaProducer<String, String>(properties);
            // 消息对象
            for(int i = 0; i< 100; i++) {
                String key = "key-" + i;
                ProducerRecord<String,String> record =
                        new ProducerRecord<>(TOPIC_NAME, key,"value-" + i);
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        System.out.println("key:" + key + " , recordMetadata ,partition:" + recordMetadata.partition()
                                +",offset: " + recordMetadata.offset());
                    }
                });
    
            }
    
    
            //关闭通道
            producer.close();
        }
    
    
    }
    

      

    4、消费者

    public class JaasConsumerDemo {
    
        private final static String  TOPIC_NAME = "test5";
    
        static {
            System.setProperty("java.security.auth.login.config", "E:\study\xxstudy\kafkademo\config\kafka_client_jaas.conf");
    
        }
    
        public static void main(String[] args) {
    
            consumerWithJaas();
        }
    
        private static  void consumerWithJaas(){
            Properties prop = new Properties();
            prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"118.xx.xx.101:9092");
            prop.put("group.id","test");
            prop.put("enable.auto.commit","true");
            prop.put("auto.commit.interval.ms","1000");
            prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            prop.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
            prop.put(SaslConfigs.SASL_MECHANISM,"PLAIN");
    
            KafkaConsumer<String,String>  consumer = new KafkaConsumer<String, String>(prop);
            // 消费订阅哪个Topic或者几个Topic
            consumer.subscribe(Arrays.asList(TOPIC_NAME));
            while (true){
                ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
                for( ConsumerRecord<String,String> record: records){
                    System.out.printf("partition - %d, offset - %d, key - %s, value - %s%n",
                            record.partition(),record.offset(), record.key(), record.value());
                }
            }
        }
    
    }
    

      

    5、测试

    运行消费者,再运行生产者

    作者:Work Hard Work Smart
    出处:http://www.cnblogs.com/linlf03/
    欢迎任何形式的转载,未经作者同意,请保留此段声明!

  • 相关阅读:
    企业身份识别系统 corporate Identity System
    初试C#中的应用程序+SQLServer编写
    组策略对软件使用的限制
    Ajax初试
    Web技术应用率报告
    领导者必须抵御的诱惑
    asp与网站安全的初步构想(1)——操作系统安全
    XP 的Bug?
    C#的多线程(2)——机制探索
    网站设计
  • 原文地址:https://www.cnblogs.com/linlf03/p/15355572.html
Copyright © 2011-2022 走看看