zoukankan      html  css  js  c++  java
  • springboot kafka SASL_SSL SCRAM-SHA-512 ,外加打包jar读不到.jks文件解决方法


    springboot kafka SASL_SSL SCRAM-SHA-512 ,外加打包jar读不到.jks文件解决方法

    配置文件:
    kafka:
    ssl:
    truststore-location: D:client_truststore.jks
    truststore-password: asfaf

    配置类:
    import org.apache.commons.io.FileUtils;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.common.config.SslConfigs;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.system.ApplicationHome;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

    import java.io.File;
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Objects;

    /**
    * Created
    */
    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {

    @Value("${kafka.ssl.truststore-location}")
    private String truststore_location;

    @Value("${kafka.ssl.truststore-password}")
    private String truststore_password;

    /**
    * 配置监听,将消费工厂信息配置进去
    * @return ConcurrentKafkaListenerContainerFactory
    */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<Integer,String> kafkaListenerContainerFactory(){
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<Integer, String>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
    }

    /**
    * 消费
    * 消费工厂
    * @return ConsumerFactory
    */
    @Bean
    public ConsumerFactory<Integer,String> consumerFactory(){
    return new DefaultKafkaConsumerFactory<Integer, String>(consumerConfigs());
    }

    /**
    * 消费配置
    * @return Map
    */
    public Map<String,Object> consumerConfigs(){
    HashMap<String, Object> props = new HashMap<String, Object>();
    props.put("enable.auto.commit", "true");
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "my-consumer");

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "*****:9092,*****:9092,*****:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "Mygrouname");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put("security.protocol", "SASL_SSL");
    props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");

    //打包成jar后,无法找到文件的解决方法(复制jar包里的文件,到文件目录里)
    ApplicationHome applicationHome = new ApplicationHome(KafkaConsumerConfig.class);
    //项目打包成jar包所在的根路径
    String rootPath = applicationHome.getSource().getParentFile().toString();
    String configFilePath = rootPath + "\client_truststore.jks";
    File configFile = new File(configFilePath);
    if (!configFile.exists()) {
    try {
    //获取类路径下的指定文件流 (项目目录下的: /resource/libs/client_truststore.jks)
    InputStream in = this.getClass().getClassLoader().getResourceAsStream("libs/client_truststore.jks");
    FileUtils.copyInputStreamToFile(Objects.requireNonNull(in, "config.xml文件找不到"), configFile);
    } catch (IOException e) {
    throw new IllegalArgumentException("保存文件证书失败->" + e.getMessage());
    }
    }
    System.out.println("证书文件地址:" + configFilePath);
    props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, configFilePath);

    //props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststore_location);
    props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststore_password);
    props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username='帐号' password='密码';"); //注意passwod结尾的分号一定不要漏
    props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
    return props;
    }
    }

    读取消息:
    //这里相当于消息接收者,进行消息接收后处理
    @KafkaListener(topics = {"topics_tst"})
    public void processMessage(ConsumerRecord<?, ?> record) {
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    if (kafkaMessage.isPresent()) {
    Object message = kafkaMessage.get();

    }
    }

  • 相关阅读:
    分页存储过程
    C#,单元测试
    telerik reporting报表
    在Linq to sql 和 Entity framework 中使用lambda表达式实现left join
    .NET提供了三种后台输出js的方式:
    转换人民币大小金额
    ASP.Net Post方式获取数据流的一种简单写法
    js数组中两个有相同删除一个
    我的个人博客
    It is the courage
  • 原文地址:https://www.cnblogs.com/suizhikuo/p/13497206.html
Copyright © 2011-2022 走看看