如果要想在java客户端进行Kerberos认证,则一定需要有一个与之匹配的Kerberos配置文件存在。现在在D盘上建立一个客户端的访问程序文件:kafka_client_jaas.conf
vim d:/kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="bob"
password="bob-pwd";
};
如果要想在Java程序里面配置Kerberos认证处理操作,则需要将上面配置文件的路劲引入到项目之中:
static {
// 表示系统环境属性
System.setProperty("java.security.auth.login.config",
"d:/kafka_client_jaas.conf");
}
生产者程序代码——KerberosSendMessageProducer.java
package cn.lynch.mykafka.producer;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class KerberosSendMessageProducer {
public static final String SERVERS = "203.195.205.63:9092";
public static final String TOPIC = "lynch-topic";
static {
System.setProperty("java.security.auth.login.config",
"d:/kafka_client_jaas.conf"); // 表示系统环境属性
}
public static void main(String[] args) {
// 如果要想进行Kafka消息发送需要使用Properties定义一些环境属性
Properties props = new Properties();
props.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); // 定义Kafka服务地址
// Kafka之中是以key和value的形式进行消息的发送处理, 所以为了保证Kafka的性能,专门提供有统一类型
// props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") ;
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) ;
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()) ;
long start = System.currentTimeMillis() ;
// 定义消息发送者对象,依靠此对象可以进行消息的传递
Producer<String,Integer> producer = new KafkaProducer<String,Integer>(props) ;
for (int x = 0 ; x < 100 ; x ++) {
producer.send(new ProducerRecord<String,Integer>(TOPIC,"mldn-" + x,x)) ;
}
long end = System.currentTimeMillis() ;
System.out.println("*** 消息发送完成:" + (end - start));
producer.close();
}
}
消费端程序代码——KerberosReceiveMessageConsumer
package cn.lynch.mykafka.consumer;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class KerberosReceiveMessageConsumer {
public static final String SERVERS = "203.195.205.63:9092";
public static final String TOPIC = "lynch-topic";
static {
System.setProperty("java.security.auth.login.config",
"d:/kafka_client_jaas.conf"); // 表示系统环境属性
}
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
// 定义消息消费者的连接服务器地址
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
// 消息消费者一定要设置反序列化的程序类,与消息生产者完全对应
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
IntegerDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-5");
// 定义消费者处理对象
Consumer<String, Integer> consumer = new KafkaConsumer<String, Integer>(
props);
consumer.subscribe(Arrays.asList(TOPIC)); // 设置消费者读取的主题名称
boolean flag = true; // 消费者需要一直进行数据的读取处理操作
while (flag) { // 一直读取消息
ConsumerRecords<String, Integer> allRecorders = consumer.poll(200);
for (ConsumerRecord<String, Integer> record : allRecorders) {
System.out.println(
"key = " + record.key() + "、value = " + record.value());
}
}
consumer.close();
}
}
如果使用的是SSL认证,发现认证失败之后实际上不会立刻断掉链接,因为SSL是基于jvm的认证处理操作,而Kerberos认证处理操作的性能一定要比ssl更好,所以新时代的kafka处理基本上都以sasl认证为主,sasl认证就是Kerberos认证。