Kafka安装参考:Kafka安装(一)
一、Kafka整合
1、创建SpringBoot项目
引入spring-kafka依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
完整pom如下:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <parent> 8 <groupId>org.springframework.boot</groupId> 9 <artifactId>spring-boot-starter-parent</artifactId> 10 <version>2.2.5.RELEASE</version> 11 </parent> 12 13 <groupId>org.example</groupId> 14 <artifactId>test-springboot-kafka</artifactId> 15 <version>1.0-SNAPSHOT</version> 16 17 <properties> 18 <maven.compiler.source>8</maven.compiler.source> 19 <maven.compiler.target>8</maven.compiler.target> 20 </properties> 21 22 <dependencies> 23 <dependency> 24 <groupId>org.springframework.boot</groupId> 25 <artifactId>spring-boot-starter-web</artifactId> 26 </dependency> 27 28 <dependency> 29 <groupId>org.springframework.kafka</groupId> 30 <artifactId>spring-kafka</artifactId> 31 </dependency> 32 </dependencies> 33 34 35 </project>
2、application.yml配置kafka连接
1 spring: 2 kafka: 3 bootstrap-servers: 172.101.203.33:9092 4 producer: 5 # 发生错误后,消息重发的次数。 6 retries: 0 7 #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。 8 batch-size: 16384 9 # 设置生产者内存缓冲区的大小。 10 buffer-memory: 33554432 11 # 键的序列化方式 12 key-serializer: org.apache.kafka.common.serialization.StringSerializer 13 # 值的序列化方式 14 value-serializer: org.apache.kafka.common.serialization.StringSerializer 15 # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 16 # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 17 # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 18 acks: 1 19 consumer: 20 # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D 21 auto-commit-interval: 1S 22 # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: 23 # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录) 24 # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录 25 auto-offset-reset: earliest 26 # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 27 enable-auto-commit: false 28 # 键的反序列化方式 29 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer 30 # 值的反序列化方式 31 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer 32 listener: 33 # 在侦听器容器中运行的线程数。 34 concurrency: 5 35 #listner负责ack,每调用一次,就立即commit 36 ack-mode: manual_immediate 37 missing-topics-fatal: false
3、kafka生产者
1 @RestController 2 public class KafkaController { 3 4 @Autowired 5 private KafkaTemplate<String, Object> kafkaTemplate; 6 7 //自定义topic 8 public static final String TOPIC_TEST = "topic-test"; 9 10 @RequestMapping("/sendMsg") 11 public void sendMsg(String msg) throws JsonProcessingException { 12 13 System.out.println("准备发送消息为:" + msg); 14 //发送消息 15 ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, msg); 16 future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { 17 @Override 18 public void onFailure(Throwable throwable) { 19 //发送失败的处理 20 System.out.println(TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage()); 21 } 22 23 @Override 24 public void onSuccess(SendResult<String, Object> stringObjectSendResult) { 25 //成功的处理 26 System.out.println(TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString()); 27 } 28 }); 29 } 30 }
4、kafka消费者
1 @Component 2 public class KafkaConsumer { 3 4 //自定义topic 5 public static final String TOPIC_TEST = "topic-test"; 6 7 // 8 public static final String TOPIC_GROUP1 = "topic-group1"; 9 10 // 11 public static final String TOPIC_GROUP2 = "topic-group2"; 12 13 14 @KafkaListener(topics = TOPIC_TEST, groupId = TOPIC_GROUP1) 15 public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { 16 17 Optional message = Optional.ofNullable(record.value()); 18 if (message.isPresent()) { 19 Object msg = message.get(); 20 System.out.println("topic_test 消费了: Topic:" + topic + ",Message:" + msg); 21 ack.acknowledge(); 22 } 23 } 24 25 @KafkaListener(topics = TOPIC_TEST, groupId = TOPIC_GROUP2) 26 public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { 27 28 Optional message = Optional.ofNullable(record.value()); 29 if (message.isPresent()) { 30 Object msg = message.get(); 31 System.out.println("topic_test1 消费了: Topic:" + topic + ",Message:" + msg); 32 ack.acknowledge(); 33 } 34 } 35 }
5、启动类
1 @SpringBootApplication 2 public class Application { 3 public static void main(String[] args) { 4 SpringApplication.run(Application.class); 5 } 6 }
6、测试运行
1、运行SpringBoot项目
2、使用地址:http://localhost:8080/sendMsg?msg=abc,访问服务器
3、结果如下:
准备发送消息为:abc topic-test - 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topic-test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=abc, timestamp=null), recordMetadata=topic-test-0@0] topic_test 消费了: Topic:topic-test,Message:abc topic_test1 消费了: Topic:topic-test,Message:abc
二、Kafka自动配置原理
1 @Configuration(proxyBeanMethods = false) 2 @ConditionalOnClass(KafkaTemplate.class) 3 @EnableConfigurationProperties(KafkaProperties.class) 4 @Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class }) 5 public class KafkaAutoConfiguration { 6 7 private final KafkaProperties properties; 8 9 public KafkaAutoConfiguration(KafkaProperties properties) { 10 this.properties = properties; 11 } 12 13 @Bean 14 @ConditionalOnMissingBean(KafkaTemplate.class) 15 public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, 16 ProducerListener<Object, Object> kafkaProducerListener, 17 ObjectProvider<RecordMessageConverter> messageConverter) { 18 KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory); 19 messageConverter.ifUnique(kafkaTemplate::setMessageConverter); 20 kafkaTemplate.setProducerListener(kafkaProducerListener); 21 kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic()); 22 return kafkaTemplate; 23 } 24 25 @Bean 26 @ConditionalOnMissingBean(ProducerListener.class) 27 public ProducerListener<Object, Object> kafkaProducerListener() { 28 return new LoggingProducerListener<>(); 29 } 30 31 @Bean 32 @ConditionalOnMissingBean(ConsumerFactory.class) 33 public ConsumerFactory<?, ?> kafkaConsumerFactory() { 34 return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties()); 35 } 36 37 @Bean 38 @ConditionalOnMissingBean(ProducerFactory.class) 39 public ProducerFactory<?, ?> kafkaProducerFactory() { 40 DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>( 41 this.properties.buildProducerProperties()); 42 String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix(); 43 if (transactionIdPrefix != null) { 44 factory.setTransactionIdPrefix(transactionIdPrefix); 45 } 46 return factory; 47 } 48 49 @Bean 50 @ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix") 51 @ConditionalOnMissingBean 52 public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) { 53 return new KafkaTransactionManager<>(producerFactory); 54 } 55 56 @Bean 57 @ConditionalOnProperty(name = "spring.kafka.jaas.enabled") 58 @ConditionalOnMissingBean 59 public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException { 60 KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer(); 61 Jaas jaasProperties = this.properties.getJaas(); 62 if (jaasProperties.getControlFlag() != null) { 63 jaas.setControlFlag(jaasProperties.getControlFlag()); 64 } 65 if (jaasProperties.getLoginModule() != null) { 66 jaas.setLoginModule(jaasProperties.getLoginModule()); 67 } 68 jaas.setOptions(jaasProperties.getOptions()); 69 return jaas; 70 } 71 72 @Bean 73 @ConditionalOnMissingBean 74 public KafkaAdmin kafkaAdmin() { 75 KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties()); 76 kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast()); 77 return kafkaAdmin; 78 } 79 80 }
参考:https://www.jianshu.com/p/6ce5d9a96113?utm_campaign=hugo