flume消费需要kerberos认证的kafka集群
环境准备:
kerberos认证需要有三个认证相关文件:
jaas.conf krb5.conf .keytab密钥文件(能实现密钥文件,如果没有该密钥文件,jaas.conf文件中需要指明认证的用户名及密码)
flume版本:
需要保持和认证的kafka集群的版本一致,至少不能低于kafka集群。
查看flume source kafka接口的版本:
运行flume程序后,日志有输出kafka的版本:
认证步骤:
1: flume配置文件:
agent.sources = s1
agent.sinks = k1
agent.channels = c1
agent.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.s1.kafka.bootstrap.servers = 192.168.133.137:9092 # 消费的kafka集群地址
agent.sources.s1.kafka.topics = test-log # topic名称
agent.sources.s1.kafka.consumer.group.id = test
agent.sources.s1.kafka.consumer.security.protocol = SASL_PLAINTEXT
agent.sources.s1.kafka.consumer.sasl.mechanism = GSSAPI
agent.sources.s1.kafka.consumer.sasl.kerberos.service.name = kafka
agent.sources.s1.interceptors=i1
agent.sources.s1.interceptors.i1.type = static
agent.sources.s1.interceptors.i1.key=key
agent.sources.s1.interceptors.i1.value={"agent_ip":"192.168.133.130"}
agent.channels.c1.type = memory
agent.channels.c1.capacity = 100000
agent.channels.c1.transactionCapacity = 5000
agent.sinks.k1.type = thrift
agent.sinks.k1.connect-timeout = 1000
agent.sinks.k1.request-timeout = 1000
agent.sinks.k1.hostname = 192.168.133.137
agent.sinks.k1.port = 5330
agent.sinks.k1.connect.timeout = 0
agent.sinks.k1.request.timeout = 0
2、修改flume-en.sh文件
把krb5.onf和jass.conf文件的路径写入JAVA_OPTS变量中。
3、修改 jass.conf文件中密钥的路径:
4、确认认证server域名的映射。