1、证书准备
1 #!/bin/bash 2 3 PASSWORD=pwd123 4 ADDR=kafka-single 5 ITEM_NAME=my-kafka 6 CAROOT_NAME=CARoot 7 8 #Step 0: 创建保存目录 9 mkdir -p /usr/ca/{root,server,client,trust,py-kfk} && cd /usr/ca/ 10 #修改hosts文件 11 if [ $(awk "BEGIN{flag=0}{if(match($0,/$ADDR/)){flag=1}}END{print flag}" /etc/hosts) -eq 0 ];then 12 echo 127.0.0.1 $ADDR >> /etc/hosts 13 fi 14 15 #Step 1 生成服务端秘钥库 16 KEYSTORE_S=/usr/ca/server/server.keystore.jks 17 keytool -keystore $KEYSTORE_S -alias $ITEM_NAME -validity 365 -keyalg RSA -genkey -keypass $PASSWORD -dname "CN=$ADDR,OU=sf,O=sf,L=shenzheng,S=GuangDong,C=cn" -storepass $PASSWORD SAN=DNS:$ADDR 18 19 #Step 2 生成CA根证书、秘钥 20 CA_KEY=/usr/ca/root/ca-key 21 CA_CERT=/usr/ca/root/ca-cert 22 openssl req -new -x509 -keyout $CA_KEY -out $CA_CERT -days 365 -passout pass:$PASSWORD -subj "/C=cn/ST=GuangDong/L=shenzheng/O=sf/OU=sf/CN=$ADDR" 23 24 #Step 3 通过CA证书创建一个受服务端、客户端信任的证书 25 TRUSTSTORE_S=/usr/ca/trust/server.truststore.jks 26 TRUSTSTORE_C=/usr/ca/trust/client.truststore.jks 27 keytool -keystore $TRUSTSTORE_S -alias $CAROOT_NAME -import -file $CA_CERT -storepass $PASSWORD 28 keytool -keystore $TRUSTSTORE_C -alias $CAROOT_NAME -import -file $CA_CERT -storepass $PASSWORD 29 30 #Step 3 导出服务端证书 server-cert-file 31 SERVER_CERT_FILE=/usr/ca/server/server-cert-file 32 keytool -keystore $KEYSTORE_S -alias $ITEM_NAME -certreq -file $SERVER_CERT_FILE -storepass $PASSWORD 33 34 #Step 4 使用CA根证书秘钥给服务端证书签名 35 SERVER_CERT_SIGNED=/usr/ca/server/server-cert-signed 36 openssl x509 -req -CA $CA_CERT -CAkey $CA_KEY -in $SERVER_CERT_FILE -out $SERVER_CERT_SIGNED -days 365 -CAcreateserial -passin pass:$PASSWORD 37 38 #Step 5 将CA根证书导入服务端仓库 39 keytool -keystore $KEYSTORE_S -alias $CAROOT_NAME -import -file $CA_CERT -storepass $PASSWORD 40 41 #Step 6 将签名后的服务端证书导入服务端秘钥库 42 keytool -keystore $KEYSTORE_S -alias $ITEM_NAME -import -file $SERVER_CERT_SIGNED -storepass $PASSWORD
2. 导出 python-kafka 所需证书
1 #导出 pem 格式的证书供 kafka-python 使用 2 #查看服务端秘钥库中的秘钥证书 3 #keytool -list -rfc -keystore $KEYSTORE_S 4 5 #导出CA签名后的服务端证书 6 SERVER_CERT_SIGNED_PEM_FORMAT=/usr/ca/py-kfk/server_signed.pem 7 CAROOT_PEM_FORMAT=/usr/ca/py-kfk/CARoot.pem 8 keytool -exportcert -alias $ITEM_NAME -keystore $KEYSTORE_S -storepass $PASSWORD -rfc -file $SERVER_CERT_SIGNED_PEM_FORMAT 9 keytool -exportcert -alias $CAROOT_NAME -keystore $KEYSTORE_S -storepass $PASSWORD -rfc -file $CAROOT_PEM_FORMAT
3. 修改kafka服务器配置
修改 server.properties 配置 listeners=PLAINTEXT://localhost:9092,SASL_SSL://kafka-single:9093 ssl.keystore.location=/usr/ca/server/server.keystore.jks ssl.keystore.password=pwd123 ssl.key.password=pwd123 ssl.truststore.location=/usr/ca/trust/server.truststore.jks ssl.truststore.password=pwd123 ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.keystore.type=JKS ssl.truststore.type=JKS ssl.secure.random.implementation=SHA1PRNG security.inter.broker.protocol=SSL #只使用SSL #使用SASL/PLAIN + TLS 时 #security.inter.broker.protocol=SASL_SSL #sasl.mechanism.inter.broker.protocol=PLAIN #sasl.enabled.mechanisms=PLAIN
4. 启动 zookeeper
~/kafka # bin/zookeeper-server-start.sh config/zookeeper.properties &
5. 启动 broker
~/kafka # bin/kafka-server-start.sh config/server.properties
6. python-kafka 连接服务器,若使用 PLAIN 文本
sasl_mechanism="PLAIN" 即可,SCRAM-SHA-512 需服务器做相应配置
1 # -*- coding: utf-8 -*- 2 from kafka import KafkaConsumer, KafkaProducer 3 import kafka 4 import ssl 5 import logging 6 import time 7 import json 8 9 10 bootstrap_servers = 'kafka-single:9093' 11 topic='sqtest' 12 13 ssl_certfile = "/usr/ca/py-kfk/certificate.pem" 14 ssl_cafile = "/usr/ca/py-kfk/CARoot.pem" 15 16 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 17 context.verify_mode = ssl.CERT_NONE 18 context.check_hostname = False 19 context.load_verify_locations(ssl_cafile) 20 21 sasl_mechanism="SCRAM-SHA-512" 22 23 username="user" 24 pwd="pwd" 25 26 producer = KafkaProducer( 27 bootstrap_servers=bootstrap_servers, 28 acks='all', 29 retries=1, 30 api_version=(0, 11, 0, 3), 31 ssl_check_hostname=False, 32 security_protocol="SASL_SSL", 33 ssl_context=context, 34 sasl_mechanism=sasl_mechanism, 35 sasl_plain_username=username, 36 sasl_plain_password=pwd 37 ) 38 print producer.bootstrap_connected() 39 print "send message" 40 for i in range(1): 41 producer.send(topic,'localhost test') 42 producer.flush() 43 producer.close()
#########################################
7.SASL/PLAIN 用户名秘密配置
配置 Kafka Broker
在 Kafka broker 的 config 目录中, 添加一个类似于下面的适当修改过的 JAAS 文件. 在这个例子中, 让我们将它命名为
kafka_server_jaas.conf:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_encrypt="pwd123";
};
#将其路径作为JVM参数,下面行也可直接加入启动脚本中去 bin/kafka-server-start.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=~/kafka/config/kafka_server_jaas.conf"
前台启动broker (启动前记得修改 server.properties 使其支持 SASL/PLAIN + TLS)
~/kafka # bin/kafka-server-start.sh config/server.properties
8.测试数据上报服务器
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice