zoukankan      html  css  js  c++  java
  • 使用kafka-python客户端进行kafka kerberos认证

        之前说过python confluent kafka客户端做kerberos认证的过程,如果使用kafka python客户端的话同样也可以进行kerberos的认证,具体的认证机制这里不再描述,主要叙述配置认证的过程

        需要的模块有下面这些:

        kafka-python:https://pypi.org/project/kafka-python/

        gssapi:https://pypi.org/project/gssapi/

        decorator:https://pypi.org/project/decorator/

        six:https://pypi.org/project/six/

        kerberos环境

        kafka python开启GSSAPI需要模块gssapi的支持,而gssapi模块需要依赖于decorator模块和six模块,但是安装时不会校验和提示,如果不安装的话kafka python运行是会提示找不到gssapi lib,真正的原因还是因为decorator或者six没有安装,这里要注意. 

        首先安装decorator和six这两个模块.

        然后安装gssapi模块,安装的时候要确保decorator和six模块正常安装并且kerberos需要的开发包正常安装,否则gssapi会编译失败,安装kerberos库可以使用yum命令如下:

    yum install krb5-server krb5-libs krb5-auth-dialog

        然后编译并安装gssapi,这里是gssapi-1.6.1.tar.gz,安装如下:

    tar -xvzf gssapi-1.6.1.tar.gz
    cd gssapi-1.6.1
    python3 setup.py build
    python3 setup.py install
    cd ..

        完成之后要退出源码目录,因为导入模块可能会出现冲突,然后进入python解释器,测试一下模块的安装情况:

    from gssapi.raw.misc import GSSError

        如果导入模块没问题,则说明gssapi安装成功. 

        最后直接安装kafka-python模块即可. 

        然后可以开始测试python脚本认证是否正常,注意执行之前要先kinit保证klist有对应的用户,然后再使用下面的代码调试:

    #!/usr/bin/env python3
    # coding=utf-8
    import time
    
    from kafka import KafkaProducer
    from kafka import KafkaConsumer
    
    def kafka_python_producer_main():
        producer = KafkaProducer(bootstrap_servers='192.168.0.3:9092,192.168.0.4:9092,192.168.0.5:9092',
                                 security_protocol='SASL_PLAINTEXT',
                                 sasl_mechanism='GSSAPI',
                                 sasl_kerberos_service_name='kafka',
                                 sasl_kerberos_domain_name='hadoop.hadoop.com',
                                 sasl_plain_username='kafkaclient')
        producer.send('testTopic', 'kafka python test'.encode('utf-8'))
        producer.flush()
        producer.close()
        print('done')
    
    def kafka_python_consumer_main():
        consumer = KafkaConsumer('testTopic',
                                 bootstrap_servers='192.168.0.3:9092,192.168.0.4:9092,192.168.0.5:9092',
                                 group_id='kafka-test-20191014',
                                 auto_offset_reset='earliest',
                                 security_protocol='SASL_PLAINTEXT',
                                 sasl_mechanism='GSSAPI',
                                 sasl_kerberos_service_name='kafka',
                                 sasl_kerberos_domain_name='hadoop.hadoop.com',
                                 sasl_plain_username='kafkaclient')
        for msg in consumer:
            print(msg.value)
            print(msg.partition)
    
    if __name__ == '__main__':
        kafka_python_producer_main()
        time.sleep(1)
        kafka_python_consumer_main()

        然后执行脚本测试,如果生产和消费消息都正常,说明kafka kerberos认证成功. 

  • 相关阅读:
    一个男人该有的气质
    有没有想过,也许一辈子你都是个小人物
    System.IO.File.WriteAllText("log.txt", "dddd");
    cn_windows_10_enterprise_version_1703_updated_june_2017_x64_dvd_10720588.iso
    Visual Studio 2015 update 3各版本下载地址
    优麒麟 16.04 LTS(长期支持)版本
    干货!最全羽毛球技术动态分解gif图
    添加缓存 绝对时间过期
    C#缓存absoluteExpiration、slidingExpiration两个参数的疑惑
    无法解析依赖项。“Microsoft.Net.Http 2.2.29”与 'Microsoft.Net.Http.zh-Hans
  • 原文地址:https://www.cnblogs.com/freeweb/p/11675648.html
Copyright © 2011-2022 走看看