zoukankan      html  css  js  c++  java
  • Kafka Kerberos客户端访问

    如果要想在java客户端进行Kerberos认证,则一定需要有一个与之匹配的Kerberos配置文件存在。现在在D盘上建立一个客户端的访问程序文件:kafka_client_jaas.conf

    vim d:/kafka_client_jaas.conf

    KafkaClient {  
            org.apache.kafka.common.security.plain.PlainLoginModule required  
            username="bob"  
            password="bob-pwd";  
    }; 

    如果要想在Java程序里面配置Kerberos认证处理操作,则需要将上面配置文件的路劲引入到项目之中:

    static {
            // 表示系统环境属性
            System.setProperty("java.security.auth.login.config",
                    "d:/kafka_client_jaas.conf");    
    }

    生产者程序代码——KerberosSendMessageProducer.java

    package cn.lynch.mykafka.producer;
    
    import java.util.Properties;
    
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.common.serialization.IntegerSerializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    public class KerberosSendMessageProducer {
        public static final String SERVERS = "203.195.205.63:9092";
        public static final String TOPIC = "lynch-topic";
        static {
            System.setProperty("java.security.auth.login.config",
                    "d:/kafka_client_jaas.conf");    // 表示系统环境属性
        }
        public static void main(String[] args) {
            // 如果要想进行Kafka消息发送需要使用Properties定义一些环境属性
            Properties props = new Properties();
            props.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
            props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
            props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); // 定义Kafka服务地址
            // Kafka之中是以key和value的形式进行消息的发送处理, 所以为了保证Kafka的性能,专门提供有统一类型
            // props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") ;
            props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) ;
            props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()) ;
            long start = System.currentTimeMillis() ;
            // 定义消息发送者对象,依靠此对象可以进行消息的传递
            Producer<String,Integer> producer = new KafkaProducer<String,Integer>(props) ;
            for (int x = 0 ; x < 100 ; x ++) {
                producer.send(new ProducerRecord<String,Integer>(TOPIC,"mldn-" + x,x)) ;
            }
            long end = System.currentTimeMillis() ;
            System.out.println("*** 消息发送完成:" + (end - start));
            producer.close(); 
        }
    
    }

    消费端程序代码——KerberosReceiveMessageConsumer

    package cn.lynch.mykafka.consumer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.common.serialization.IntegerDeserializer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    public class KerberosReceiveMessageConsumer {
        public static final String SERVERS = "203.195.205.63:9092";
        public static final String TOPIC = "lynch-topic";
        static {
            System.setProperty("java.security.auth.login.config",
                    "d:/kafka_client_jaas.conf");    // 表示系统环境属性
        }
        
        public static void main(String[] args) {
    
            Properties props = new Properties();
            
            props.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
            props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    
            
            // 定义消息消费者的连接服务器地址
            props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
            // 消息消费者一定要设置反序列化的程序类,与消息生产者完全对应
            props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                    StringDeserializer.class.getName());
            props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                    IntegerDeserializer.class.getName());
            props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-5");
            // 定义消费者处理对象
            Consumer<String, Integer> consumer = new KafkaConsumer<String, Integer>(
                    props);
            consumer.subscribe(Arrays.asList(TOPIC)); // 设置消费者读取的主题名称
            boolean flag = true; // 消费者需要一直进行数据的读取处理操作
            while (flag) { // 一直读取消息
                ConsumerRecords<String, Integer> allRecorders = consumer.poll(200);
                for (ConsumerRecord<String, Integer> record : allRecorders) {
                    System.out.println(
                            "key = " + record.key() + "、value = " + record.value());
                }
            }
            consumer.close();
        }
    }

    如果使用的是SSL认证,发现认证失败之后实际上不会立刻断掉链接,因为SSL是基于jvm的认证处理操作,而Kerberos认证处理操作的性能一定要比ssl更好,所以新时代的kafka处理基本上都以sasl认证为主,sasl认证就是Kerberos认证。

  • 相关阅读:
    Bit,Byte,Word,Dword,Qword
    One good turn deserves another
    IHttpModule & IHttpHandler
    畅想:哈夫曼树的应用
    The Controls collection cannot be modified because the control contains code blocks
    Talk O/RM (DAL) too ...
    实现对象集合枚举接口
    [ZT]实现创造生命的古老梦想——合成生物学的发展走向
    笔记本基础知识篇之DVI接口详解
    Analysis Services: write back
  • 原文地址:https://www.cnblogs.com/linjiqin/p/13200356.html
Copyright © 2011-2022 走看看