zoukankan      html  css  js  c++  java
  • kafka 添加SASL鉴权

    kafka 版本信息:kafka_2.12-2.3.0

    使用kafka自带的zookeeper启动

    bin/zookeeper-server-start.sh config/zookeeper.properties

    kafka启动:

    bin/kafka-server-start.sh config/server.properties

    nohup bin/kafka-server-start.sh config/server.properties > logs/server-start.log 2>&1 &
    这是后台启动,不加nohup,日志会输出到控制台上面去。其中,server-start.log是自己写的一个log文件,在原有的文件logs下面是没有的。

    配置SASL步骤:

    1.修改bin/server.properties文件

    ############################# Socket Server Settings #############################

    # The address the socket server listens on. It will get the value returned from
    # java.net.InetAddress.getCanonicalHostName() if not configured.
    #   FORMAT:
    #     listeners = listener_name://host_name:port
    #   EXAMPLE:
    #     listeners = PLAINTEXT://your.host.name:9092
    security.inter.broker.protocol=SASL_PLAINTEXT
    sasl.mechanism.inter.broker.protocol=PLAIN
    sasl.enabled.mechanisms=PLAIN
    listeners=SASL_PLAINTEXT://ip:9092

    # Hostname and port the broker will advertise to producers and consumers. If not set,
    # it uses the value for "listeners" if configured.  Otherwise, it will use the value
    # returned from java.net.InetAddress.getCanonicalHostName().
    advertised.listeners=SASL_PLAINTEXT://ip:9092

    # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
    #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    其他值未做任何改变

    2.新建config/kafka_server_jaas.conf

    KafkaServer {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin"
        user_admin="admin"
        user_alice="alice";
    };
    新建config/kafka_client_jaas.conf --此步可不建,因我要使用控制台去消费数据

    KafkaClient {
            org.apache.kafka.common.security.plain.PlainLoginModule required
            username="admin"
            password="admin";
    };

    3.修改bin/kafka-server-start.sh

    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
        export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/zhufei/software/kafka_2.12-2.3.0/config/kafka_server_jaas.conf"
    fi

    修改bin/kafka-console-consumer.sh  kafka-console-producer.sh


    if [ "x$KAFKA_OPTS" ]; then
     export KAFKA_OPTS="-Djava.security.auth.login.config=/home/zhufei/software/kafka_2.12-2.3.0/config/kafka_client_jaas.conf"
    fi

    修改config/consumer.properties producer.properties

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=PLAIN

    此红色部分实际可不用修改,因我要使用控制台消费数据

    4.控制台启动消费者

    bin/kafka-console-consumer.sh --bootstrap-server 192.168.3.8:9092 --topic test --from-beginning --consumer.config config/consumer.properties

    5.java 应用充当生产者:

    public class TestDemo {

    public static void main(String[] args) throws Exception {
    System.setProperty("java.security.auth.login.config", "/home/zhufei/software/kafka_2.12-2.3.0/config/kafka_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径

    Properties props = new Properties();
    props.put("bootstrap.servers", "ip:9092");
    // props.put("acks", "all");
    props.put("retries", 3);
    props.put("batch.size", 16384);
    // props.put("linger.ms", 1);
    // props.put("buffer.memory", 33554432);
    props.put("key.serializer", StringSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());
    // props.put("partitioner.class", HashPartitioner.class.getName());
    // props.put("interceptor.classes", EvenProducerInterceptor.class.getName());

    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "PLAIN");

    Producer<String, String> producer = new KafkaProducer<String, String>(props);
    // for (int i = 0; i < 10; i++)
    producer.send(new ProducerRecord<String, String>("test", null, "hello world 20190909 fox"));
    producer.close();
    }

    }



    7.结果:能正常发送,正常消费
    zhufei@SilverRiver:~/software/kafka_2.12-2.3.0$ bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic test --from-beginning --consumer.config config/consumer.properties
    hello world 20190909 fox

    8.kafka文档中有写在生产环境中使用SASL/PLAIN的方式,即配置
    listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.example.CustomPlainServerCallbackHandler
    其中listener.name为固定值,sasl_plaintext为listenername的值,可从listeners = listener_name://host_name:port中获得填入的listener_name,并且全小写.
    后面的plain.sasl.server.callback.handler.class也为固定值。
    com.example.CustomPlainServerCallbackHandler 为自己实现的callbackhandler类。
    自己实现的类,可以制作成jar包,放入kafka的libs目录中,然后重启kafka即可生效。




  • 相关阅读:
    Line of Sight 计算几何基础
    Hash算法详解
    高效mysql的习惯(程序员版本)
    thymeleaf初步使用
    @Transactional注解事务不起作用
    泛型的理解
    Git&GitHun 命令合集
    springboot引入thymeleaf
    springboot静态资源映射
    springboot的配置文件
  • 原文地址:https://www.cnblogs.com/zf201149/p/11495134.html
Copyright © 2011-2022 走看看