zoukankan      html  css  js  c++  java
  • Kafka安全认证SASL/PLAIN

    zookeepe版本: zookeeper-3.4.13, 安装路径/usr/local/zookeeper-3.4.13/

    kafka版本:kafka_2.13-2.6.0.tgz

    一、Zookeeper配置

    安装Zookeeper

    参考: Zookeeper的下载、安装和启动 

    Zookeeper 集群搭建--单机伪分布式集群

    1、从Kafka/lib目录拷贝以下jar到zookeeper的lib目录下

    kafka-clients-2.6.0.jar
    lz4-java-1.7.1.jar
    slf4j-api-1.7.25.jar
    slf4j-log4j12-1.7.25.jar
    snappy-java-1.1.7.3.jar
    

      

    2、zoo.cfg 文件配置

    添加配置

    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    requireClientAuthScheme=sasl
    jaasLoginRenew=3600000
    

      

    3、编写JAAS文件,放在conf文件夹下

    /usr/local/zookeeper-3.4.13/zk_server_jaas.conf

    Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required 
        username="admin" 
        password="admin123456" 
        user_kafka="kafka123456" 
        user_producer="prod123456";
    };
    

     定义了两个用户,一个是kafka,一个是producer, 这些用user_配置出来的用户都可以提供给生产者程序和消费者程序认证使用

       还有两个属性,username和password,是配置Zookeeper节点之间内部认证的用户名和密码。

    各个节点分别启动zookeeper  

    cd /usr/local/zookeeper-3.4.13/bin

    ./zkServer.sh  start

    备注: zookeeper如果是集群的话,每个zookeeper都做相同的配置

    二、Kafka配置

    1、在kafka安装目录config下创建kafka_server_jaas.conf文件

    KafkaServer {
        org.apache.kafka.common.security.plain.PlainLoginModule required
                username="admin"
                password="admin123456"
                user_admin="admin123456"
                user_producer="prod123456"
                user_consumer="cons123456";
    };
    
    
    
    Client {
    org.apache.kafka.common.security.plain.PlainLoginModule required
            username="kafka"
            password="kafka123456";
    };
    

     KafkaServer配置的kafka的账号和密码,Client配置的是Broker到ZK的链接用户名和密码。这里要与前面zookeeper的配置zk_server_jaas.conf中user_kafka的账号和密码保持一致。 

      

    2、修改server.properties

    listeners=SASL_PLAINTEXT://0.0.0.0:9092
    advertised.listeners=SASL_PLAINTEXT://118.xx.xx.101:9092
    security.inter.broker.protocol=SASL_PLAINTEXT
    sasl.enabled.mechanisms=PLAIN
    sasl.mechanism.inter.broker.protocol=PLAIN
    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
    allow.everyone.if.no.acl.found=true
    

      

    3、修改启动脚本

    bin/kafka-server-start.sh

    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
        export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M -Djava.security.auth.login.config=/xxx/kafka/config/kafka_server_jaas.conf "
    fi 

    指定-Djava.security.auth.login.config的路径

     4、启动kafka

    ./kafka-server-start.sh  ../config/server.properties &

    三、SpringBoot整合

    spring boot版本为2.4.10

    1、引入依赖

            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>1.1.7.RELEASE</version>
            </dependency>
    

      

    2、新建kafka_client_jaas.conf

    该文件存放在E:\study\xxstudy\kafkademo\config\路径下

    KafkaClient {
     org.apache.kafka.common.security.plain.PlainLoginModule required
     username="admin"
     password="admin123456";
    }; 

    这里的用户名和密码要和前面kafka使用的账号密码相同,才能有访问权限。

    3、生产者

    public class JaasProducerDemo {
    
        private final static String  TOPIC_NAME = "test5";
    
        static {
            System.setProperty("java.security.auth.login.config", "E:\study\xxstudy\kafkademo\config\kafka_client_jaas.conf");
    
        }
    
        public static void main(String[] args) throws  Exception {
            producerSendWithJaas();
        }
    
        public static void producerSendWithJaas(){
            Properties  properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"118.xx.xx.101:9092");
            properties.put(ProducerConfig.ACKS_CONFIG,"all");
            properties.put(ProducerConfig.RETRIES_CONFIG,"0");
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
            properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
             properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
             properties.put(SaslConfigs.SASL_MECHANISM,"PLAIN");
             
            Producer<String,String> producer = new KafkaProducer<String, String>(properties);
            // 消息对象
            for(int i = 0; i< 100; i++) {
                String key = "key-" + i;
                ProducerRecord<String,String> record =
                        new ProducerRecord<>(TOPIC_NAME, key,"value-" + i);
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        System.out.println("key:" + key + " , recordMetadata ,partition:" + recordMetadata.partition()
                                +",offset: " + recordMetadata.offset());
                    }
                });
    
            }
    
    
            //关闭通道
            producer.close();
        }
    
    
    }
    

      

    4、消费者

    public class JaasConsumerDemo {
    
        private final static String  TOPIC_NAME = "test5";
    
        static {
            System.setProperty("java.security.auth.login.config", "E:\study\xxstudy\kafkademo\config\kafka_client_jaas.conf");
    
        }
    
        public static void main(String[] args) {
    
            consumerWithJaas();
        }
    
        private static  void consumerWithJaas(){
            Properties prop = new Properties();
            prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"118.xx.xx.101:9092");
            prop.put("group.id","test");
            prop.put("enable.auto.commit","true");
            prop.put("auto.commit.interval.ms","1000");
            prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            prop.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
            prop.put(SaslConfigs.SASL_MECHANISM,"PLAIN");
    
            KafkaConsumer<String,String>  consumer = new KafkaConsumer<String, String>(prop);
            // 消费订阅哪个Topic或者几个Topic
            consumer.subscribe(Arrays.asList(TOPIC_NAME));
            while (true){
                ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
                for( ConsumerRecord<String,String> record: records){
                    System.out.printf("partition - %d, offset - %d, key - %s, value - %s%n",
                            record.partition(),record.offset(), record.key(), record.value());
                }
            }
        }
    
    }
    

      

    5、测试

    运行消费者,再运行生产者

    作者:Work Hard Work Smart
    出处:http://www.cnblogs.com/linlf03/
    欢迎任何形式的转载,未经作者同意,请保留此段声明!

  • 相关阅读:
    Composite in Javascript
    Model Validation in Asp.net MVC
    HttpRuntime.Cache vs. HttpContext.Current.Cache
    Controller Extensibility in ASP.NET MVC
    The Decorator Pattern in Javascript
    The Flyweight Pattern in Javascript
    Model Binding in ASP.NET MVC
    Asp.net MVC
    jQuery Ajax 实例 全解析
    ASP.NET AJAX入门系列
  • 原文地址:https://www.cnblogs.com/linlf03/p/15355572.html
Copyright © 2011-2022 走看看