zoukankan      html  css  js  c++  java
  • Kafka配置ACL+SASL的认证配置(windows版)

    如果希望Kafka支持ACL认证,我们需要完成如下的设置。

    1.配置文件

     配置文件包括Zookeeper配置文件(Zookeeper.properties)。client配置文件(主要是consumer和producer:),kafka server配置文件(server.properties)

    1.1 zookeeper.properties的配置文件内容是

    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    requireClientAuthScheme=sasl
    jssaLoginRenew=3600000

    1.2 consumer.properties和producer.properties的配置文件如下:

    security.protocol=SAAL_PLAINTEXT
    sasl.mechanism=PLAIN

    1.3 Kafka Server的配置文件Server.properties的配置文件如下:

    # Set ip & port
    listeners=SASL_PLAINTEXT://localhost:9092
    advertised.listeners=SASL_PLAINTEXT://localhost:9092
    # Set protocol
    zookeeper.set.acl=true
    security.inter.broker.protocol=SASL_PLAINTEXT
    sasl.enabled.mechanisms=PLAIN
    sasl.mechanism.inter.broker.protocol=PLAIN
    
    # Add acl
    allow.everyone.if.no.acl.found=true
    auto.create.topics.enable=false
    delete.topic.enable=true
    advertised.host.name=localhost
    super.users=User:admin
    
    # Add class
    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

    将以上localhost更改为目标IP地址。

    2.创建JAAS文件

    JAAS文件也是需要3份:Zookeeper、Kafka server和 Kafka Client。JAAS文件是位于Kafka目录下面的的Config文件夹下面。下面来分别介绍。

    2.1 Zookeeper_jaas.conf文件,这个文件主要是给Zooke使用。内容如下:

    Server {
       org.apache.kafka.common.security.plain.PlainLoginModule required
       username="admin"
       password="password"
       user_admin="password";
    };

    2.2 Kafka_server_jaas.conf文件,这个文件主要是给Kafka server使用的。内容如下:

    KafkaServer {
       org.apache.kafka.common.security.plain.PlainLoginModule required
       username="admin"
       password="password"
       user_admin="password
       user_yd=password;
    };
    
    Client {
       org.apache.kafka.common.security.plain.PlainLoginModule required
       username="admin"
       password="password";
    };

    注意这个是两个用户配置分别是KafkaServer和Client,尽量不要写错。

    Kafka Client使用的JAAS文件,可以参照上面的格式自己创建一个。

    2.4 设置KAFKA_OPTS环境变量。

    我们可以在Zookeeper-server-start.bat、kafka-server-start.bat、kafka-console-consumer.bat、kafka-console-producer.bat这几个文件的Setlocal下面添加

    set KAFKA_OPTS=-Djava.security.auth.login.config=../../config/zookeeper_jaas.conf

    上面的例子是zookeeper-server-start.bat,其他的bat可以参照上面来做。

    3. 上面的配置做完后,就可以正常启动zookeeper和kafka了。

    4.我在配置ACL的时候遇到一直报如下的错误:

    [2021-10-18 13:12:38,363] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
    [2021-10-18 13:12:38,369] ERROR SASL authentication failed using login context 'Client' with exception: {} (org.apache.zookeeper.client.ZooKeeperSaslClient)
    javax.security.sasl.SaslException: Error in authenticating with a Zookeeper Quorum member: the quorum member's saslToken is null.
            at org.apache.zookeeper.client.ZooKeeperSaslClient.createSaslToken(ZooKeeperSaslClient.java:312)
            at org.apache.zookeeper.client.ZooKeeperSaslClient.respondToServer(ZooKeeperSaslClient.java:275)
            at org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:882)
            at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:103)
            at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:365)
            at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1223)
    [2021-10-18 13:12:38,372] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
    [2021-10-18 13:12:38,382] INFO EventThread shut down for session: 0x1000f723bdc0000 (org.apache.zookeeper.ClientCnxn)
    [2021-10-18 13:12:38,419] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
    org.apache.zookeeper.KeeperException$AuthFailedException: KeeperErrorCode = AuthFailed for /consumers
            at org.apache.zookeeper.KeeperException.create(KeeperException.java:130)
            at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
            at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:583)
            at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1729)
            at kafka.zk.KafkaZkClient.makeSurePersistentPathExists(KafkaZkClient.scala:1627)
            at kafka.zk.KafkaZkClient.$anonfun$createTopLevelPaths$1(KafkaZkClient.scala:1619)
            at kafka.zk.KafkaZkClient.$anonfun$createTopLevelPaths$1$adapted(KafkaZkClient.scala:1619)
            at scala.collection.immutable.List.foreach(List.scala:431)
            at kafka.zk.KafkaZkClient.createTopLevelPaths(KafkaZkClient.scala:1619)
            at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:457)
            at kafka.server.KafkaServer.startup(KafkaServer.scala:191)
            at kafka.Kafka$.main(Kafka.scala:109)
            at kafka.Kafka.main(Kafka.scala)
    [2021-10-18 13:12:38,421] INFO shutting down (kafka.server.KafkaServer)

    报这个错误的原因是zookeeper没有设置Kafka-OPTS,在zookeeper-server-start.bat中添加

    set KAFKA_OPTS=-Djava.security.auth.login.config=../../config/zookeeper_jaas.conf就可以了正常了。

    其他参考链接:https://blog.csdn.net/yhdeng11402/article/details/102645947

                              https://kafka.apachecn.org/intro.html

  • 相关阅读:
    STM32 F4 DAC DMA Waveform Generator
    STM32 F4 General-purpose Timers for Periodic Interrupts
    Python第十四天 序列化 pickle模块 cPickle模块 JSON模块 API的两种格式
    Python第十三天 django 1.6 导入模板 定义数据模型 访问数据库 GET和POST方法 SimpleCMDB项目 urllib模块 urllib2模块 httplib模块 django和web服务器整合 wsgi模块 gunicorn模块
    查看SQL Server服务运行帐户和SQL Server的所有注册表项
    Pycharm使用技巧(转载)
    SQL Server 2014内存优化表的使用场景
    Python第十天 print >> f,和fd.write()的区别 stdout的buffer 标准输入 标准输出 从控制台重定向到文件 标准错误 重定向 输出流和输入流 捕获sys.exit()调用 optparse argparse
    Python第七天 函数 函数参数 函数里的变量 函数返回值 多类型传值 函数递归调用 匿名函数 内置函数
    Python第六天 类型转换
  • 原文地址:https://www.cnblogs.com/VARForrest/p/15420194.html
Copyright © 2011-2022 走看看