第一步:pom.xml配置文件添加kafka支持
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
第二步:添加配置文件
#kafka
spring.kafka.inner.bootstrap-servers=172.17.0.2:9095,172.17.0.12:9095,172.17.0.13:9095
spring.kafka.inner.security-protocol=SASL_PLAINTEXT
spring.kafka.inner.sasl-mechanism=PLAIN
#=============== producer =======================
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
spring.kafka.producer.retries=0
# 生产者jaas配置账号密码
spring.kafka.producer.sasl-jaas-config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafkapswd";
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.linger-ms=5
#=============== consumer =======================
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=kafkaGroup
# 消费者jaas配置账号密码
spring.kafka.consumer.sasl-jaas-config=org.apache.kafka.common.security.plain.PlainLoginModule required username="mooc" password="moocpswd";
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=true
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.max-poll-records=1000
第三步:创建配置class
package com.xxx.service.config;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.inner.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.inner.security-protocol}")
private String kafkaSecurityProtocol;
@Value("${spring.kafka.inner.sasl-mechanism}")
private String kafkaSASLMechanism;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.sasl-jaas-config}")
private String kafkaConsumerSASLJaasConfig;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.max-poll-records}")
private String maxPollRecords;
@Value("${spring.kafka.producer.retries}")
private String producerRetries;
@Value("${spring.kafka.producer.sasl-jaas-config}")
private String kafkaProducerSASLJaasConfig;
@Value("${spring.kafka.producer.batch-size}")
private String producerBatchSize;
@Value("${spring.kafka.producer.linger-ms}")
private String producerLingerMs;
@Value("${spring.kafka.producer.buffer-memory}")
private String bufferMemory;
@Bean
public KafkaListenerContainerFactory<?> batchFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setBatchListener(true); // 开启批量监听
return factory;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> config = new HashMap<>();
//kafka地址
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//组id
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
if (!StringUtils.isEmpty(kafkaSecurityProtocol) && !StringUtils.isEmpty(kafkaSASLMechanism)
&& !StringUtils.isEmpty(kafkaConsumerSASLJaasConfig)) {
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
config.put(SaslConfigs.SASL_MECHANISM, kafkaSASLMechanism);
config.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig);
}
return config;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.RETRIES_CONFIG, producerRetries); // 重试,0为不启用重试机制
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, producerBatchSize); // 控制批处理大小,单位为字节,默认为16384
properties.put(ProducerConfig.LINGER_MS_CONFIG, producerLingerMs); // 批量发送,延迟为5毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量,默认为0
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录,默认为33554432,使用默认值即可
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
if (!StringUtils.isEmpty(kafkaSecurityProtocol) && !StringUtils.isEmpty(kafkaSASLMechanism)
&& !StringUtils.isEmpty(kafkaProducerSASLJaasConfig)) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
properties.put(SaslConfigs.SASL_MECHANISM, kafkaSASLMechanism);
properties.put("sasl.jaas.config", kafkaProducerSASLJaasConfig);
}
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
第四步:使用
//# 注入在config配置好的kafka对象
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping(value = "put_kafka")
public String put_kafka(String topic,String message){
kafkaTemplate.send(topic,message);
/*
*
# 直接使用send方法发送
kafkaTemplate.send(topic,message);
kafkaTemplate.send(topic,key,message);
kafkaTemplate.sendDefault(message);
kafkaTemplate.sendDefault(key,message);
* */
return "ok";
}
//# 批量接收数据,这里需要配置containerFactory,而我们在上述的kafkaConfig文件中配置了该消费者
@KafkaListener(topics = "topic",containerFactory = "batchFactory")
public void onMessage(List<ConsumerRecord<?, ?>> records){
List<Map<String,String>> mapList = new ArrayList<>();
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
System.out.println("Received: " + record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
System.out.println("接受数据:{}"+message);
}
}
}