zoukankan      html  css  js  c++  java
  • Java Api Consumer 连接启用Kerberos认证的Kafka

    java程序连接到一个需要Kerberos认证的kafka集群上,消费生产者生产的信息,kafka版本是2.10-0.10.0.1;

    Java程序以maven构建,(怎么构建maven工程,可去问下度娘:“maven工程入门示例”)

    先上pom.xml文件

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.ht</groupId>
      <artifactId>kafkaTest</artifactId>
      <version>1.0</version>
      <dependencies>
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>0.10.0.1</version>
        </dependency>
      </dependencies>
       <build>
             <plugins>
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-compiler-plugin</artifactId>
                     <configuration>
                         <source>1.7</source>
                         <target>1.7</target>
                     </configuration>
                 </plugin>
             </plugins>
        </build>
    </project>


    然后是Jave代码,先上图,一一解释图中标识:

    注释:

        1:可以将所需的配置文件加载到程序;(参见:度娘--“JDK 运行参数 JAVA -Dxxx与System.setProperty()的关系”)

        2:新版本的Producter和Consumer都可以直接连接brocker,不用再配置zookeeper的相关信息,所以这里是要连接的kafka的主机ip和端口号

        3:设置的topic的组Id

        4:设置偏移量

        5:设置认证配置

        6:设置所要读取的主题Topic

    import java.util.Arrays;
    import java.util.Properties;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    public class ConsumerTest {
        public static void main(String[] args) {
            
    //        System.setProperty("java.security.auth.login.config", "/home/kafka/kafka_client_jaas.conf"); 
    //        System.setProperty("java.security.krb5.conf", "/home/kafka/krb5.conf"); 
            // 环境变量添加,需要输入配置文件的路径System.out.println("===================配置文件地址"+fsPath+"\conf\cons_client_jaas.conf"); 
            Properties props = new Properties(); 
            props.put("bootstrap.servers", "192.168.132.130:9092"); 
            props.put("group.id", "group-1111"); 
            props.put("enable.auto.commit", "false"); 
            props.put("auto.commit.interval.ms", "1000"); 
            props.put("auto.offset.reset", "earliest"); 
            props.put("session.timeout.ms", "30000"); 
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     
            props.put("security.protocol", "SASL_PLAINTEXT"); 
            props.put("sasl.mechanism", "GSSAPI"); 
            props.put("sasl.kerberos.service.name", "kafka");
            
            KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props); 
            kafkaConsumer.subscribe(Arrays.asList("cust_info"));
            while (true) { 
                ConsumerRecords<String, String> records = kafkaConsumer.poll(1); 
                 for (ConsumerRecord<String, String> record : records)
                     System.out.println("Partition: " + record.partition() + " Offset: " + record.offset() + " Value: " + record.value() + " ThreadID: " + Thread.currentThread().getId());
                    
            }
        }
    }

    以上就是所有配置,将工程通过导出为Runnable JAR file 导出为jar文件

    直接运行   java -jar jar包名.jar  即可;

    如果程序里没有设置1相关的配置文件,也可以运行下列命令:

    java -Djava.security.auth.login.config=/home/kafka/kafka_client_jaas.conf   -Djava.security.krb5.conf=/home/kafka/krb5.conf  -jar  jar包名.jar 

  • 相关阅读:
    字符串匹配——KMP算法(C++)
    数论——Fibonacci数列(C++)
    数据结构——线段树之二(C++)
    数据结构——线段树之一(C++)
    最后的最后
    开始的开始
    10.25模拟 保留道路
    10.25模拟 列车调度
    10.25模拟 三角形
    洛谷 P1093 奖学金
  • 原文地址:https://www.cnblogs.com/felixzh/p/9526126.html
Copyright © 2011-2022 走看看