zoukankan      html  css  js  c++  java
  • SpringBoot集成Kafka序列化与反序列化

    maven依赖:

      <dependency>
          <groupId>org.springframework.kafka</groupId>
          <artifactId>spring-kafka</artifactId>
          <version>2.2.14.RELEASE</version>
      </dependency>
      <dependency>
          <groupId>com.fasterxml.jackson.core</groupId>
          <artifactId>jackson-core</artifactId>
          <version>2.11.1</version>
      </dependency>

    序列化在yml中配置:

    spring:
    kafka:
    bootstrap-servers: localhost:9092
    consumer:
    group-id: myGroup
    producer:
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

    反序列化:

        @Bean
        public RecordMessageConverter converter() {
            ObjectMapper mapper = new ObjectMapper();
            mapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); // 下划线转驼峰
            return new StringJsonMessageConverter(mapper);
        }

    新增topic:

        @Bean
        public NewTopic topic() {
            return new NewTopic("my-test-topic", 1, (short) 1);
        }

    测试生产者消费者:

    
    
    @Component
    public class KafkaTest {

    @Autowired
    private KafkaTemplate<String, Object> template;

    /**
    * 接收topic为database.table的消息并将消息转发到另一个topic:my-test-topic
    */
    @KafkaListener(topics = "database.table")
    public void process(MysqlSourceEvent<TaskData> record) {
    template.send("my-test-topic", record);
    }
    }
     
  • 相关阅读:
    班课2
    班课2
    班课1
    lecture 2
    lecture 1
    使用node的fs读取文件
    使用Node.js搭建一个本地服务器
    Node.js前言
    简述ES6其他的东西
    ES6异步操作Thunk、co和async
  • 原文地址:https://www.cnblogs.com/dotqin/p/13452727.html
Copyright © 2011-2022 走看看