zoukankan      html  css  js  c++  java
  • springboot集成集群kafka带密码

    第一步:pom.xml配置文件添加kafka支持

        <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    </dependency>
    </dependencies>

    第二步:添加配置文件

    #kafka
    spring.kafka.inner.bootstrap-servers=172.17.0.2:9095,172.17.0.12:9095,172.17.0.13:9095
    spring.kafka.inner.security-protocol=SASL_PLAINTEXT
    spring.kafka.inner.sasl-mechanism=PLAIN
    #=============== producer =======================
    # 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
    # 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
    spring.kafka.producer.retries=0
    # 生产者jaas配置账号密码
    spring.kafka.producer.sasl-jaas-config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafkapswd";
    # 每次批量发送消息的数量,produce积累到一定数据,一次发送
    spring.kafka.producer.batch-size=16384
    # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
    spring.kafka.producer.buffer-memory=33554432
    spring.kafka.producer.linger-ms=5
    #=============== consumer =======================
    # 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
    spring.kafka.consumer.group-id=kafkaGroup
    # 消费者jaas配置账号密码
    spring.kafka.consumer.sasl-jaas-config=org.apache.kafka.common.security.plain.PlainLoginModule required username="mooc" password="moocpswd";
    # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
    spring.kafka.consumer.auto-offset-reset=earliest
    # enable.auto.commit:true --> 设置自动提交offset
    spring.kafka.consumer.enable-auto-commit=true
    #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
    spring.kafka.consumer.auto-commit-interval=100
    spring.kafka.consumer.max-poll-records=1000



    第三步:创建配置class
    package com.xxx.service.config;


    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    import org.springframework.kafka.support.serializer.JsonSerializer;
    import org.springframework.util.StringUtils;

    import java.util.HashMap;
    import java.util.Map;

    @Configuration
    public class KafkaConfiguration {
    @Value("${spring.kafka.inner.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.inner.security-protocol}")
    private String kafkaSecurityProtocol;
    @Value("${spring.kafka.inner.sasl-mechanism}")
    private String kafkaSASLMechanism;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.consumer.sasl-jaas-config}")
    private String kafkaConsumerSASLJaasConfig;
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;
    @Value("${spring.kafka.producer.retries}")
    private String producerRetries;
    @Value("${spring.kafka.producer.sasl-jaas-config}")
    private String kafkaProducerSASLJaasConfig;
    @Value("${spring.kafka.producer.batch-size}")
    private String producerBatchSize;
    @Value("${spring.kafka.producer.linger-ms}")
    private String producerLingerMs;
    @Value("${spring.kafka.producer.buffer-memory}")
    private String bufferMemory;


    @Bean
    public KafkaListenerContainerFactory<?> batchFactory(){
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new
    ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
    factory.setBatchListener(true); // 开启批量监听
    return factory;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
    Map<String, Object> config = new HashMap<>();
    //kafka地址
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    //组id
    config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
    config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    if (!StringUtils.isEmpty(kafkaSecurityProtocol) && !StringUtils.isEmpty(kafkaSASLMechanism)
    && !StringUtils.isEmpty(kafkaConsumerSASLJaasConfig)) {
    config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
    config.put(SaslConfigs.SASL_MECHANISM, kafkaSASLMechanism);
    config.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig);
    }
    return config;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.RETRIES_CONFIG, producerRetries); // 重试,0为不启用重试机制
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, producerBatchSize); // 控制批处理大小,单位为字节,默认为16384
    properties.put(ProducerConfig.LINGER_MS_CONFIG, producerLingerMs); // 批量发送,延迟为5毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量,默认为0
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录,默认为33554432,使用默认值即可

    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
    if (!StringUtils.isEmpty(kafkaSecurityProtocol) && !StringUtils.isEmpty(kafkaSASLMechanism)
    && !StringUtils.isEmpty(kafkaProducerSASLJaasConfig)) {
    properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
    properties.put(SaslConfigs.SASL_MECHANISM, kafkaSASLMechanism);
    properties.put("sasl.jaas.config", kafkaProducerSASLJaasConfig);
    }
    return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
    }
    }


    第四步:使用

       //# 注入在config配置好的kafka对象
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping(value = "put_kafka")
    public String put_kafka(String topic,String message){

    kafkaTemplate.send(topic,message);
    /*
    *

    # 直接使用send方法发送

    kafkaTemplate.send(topic,message);
    kafkaTemplate.send(topic,key,message);
    kafkaTemplate.sendDefault(message);
    kafkaTemplate.sendDefault(key,message);
    * */
    return "ok";
    }




    //# 批量接收数据,这里需要配置containerFactory,而我们在上述的kafkaConfig文件中配置了该消费者
    @KafkaListener(topics = "topic",containerFactory = "batchFactory")
    public void onMessage(List<ConsumerRecord<?, ?>> records){
    List<Map<String,String>> mapList = new ArrayList<>();
    for (ConsumerRecord<?, ?> record : records) {
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    System.out.println("Received: " + record);
    if (kafkaMessage.isPresent()) {
    Object message = record.value();
    System.out.println("接受数据:{}"+message);
    }
    }
    }


  • 相关阅读:
    对结构体的快速排序问题(用库函数)
    图结构练习——判断给定图是否存在合法拓扑序列(dfs算法(第一个代码),邻接矩阵(前两个代码),邻接表(第三个代码))
    Linux root 密码重置与用户管理
    Linux command’s Array
    linux command intro2 vi
    Linux Commands intro1
    Beeline known issues
    spark加载hadoop本地库的时候出现不能加载的情况要怎么解决呢?
    spark Basic code demo
    HADOOP cluster some issue for installation
  • 原文地址:https://www.cnblogs.com/hui413027075/p/14056176.html
Copyright © 2011-2022 走看看