zoukankan      html  css  js  c++  java
  • 5、Spring-Kafka3

    3. Introduction

    This first part of the reference documentation is a high-level overview of Spring for Apache Kafka and the underlying concepts and some code snippets that can help you get up and running as quickly as possible.

    3.1. Quick Tour for the Impatient

    This is the five-minute tour to get started with Spring Kafka.

    Prerequisites: You must install and run Apache Kafka. Then you must grab the spring-kafka JAR and all of its dependencies. The easiest way to do that is to declare a dependency in your build tool. The following example shows how to do so with Maven:

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.2.5.RELEASE</version>
    </dependency>
    

    The following example shows how to do so with Gradle:

    compile 'org.springframework.kafka:spring-kafka:2.2.5.RELEASE'

    3.1.1. Compatibility

    This quick tour works with the following versions:

    Apache Kafka Clients 2.0.0

    Spring Framework 5.1.x

    Minimum Java version: 8

    3.1.2. A Very, Very Quick Example

    As the following example shows, you can use plain Java to send and receive a message:

    @Test
    public void testAutoCommit() throws Exception {
        logger.info("Start auto");
        ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
        final CountDownLatch latch = new CountDownLatch(4);
        containerProps.setMessageListener(new MessageListener<Integer, String>() {
    
            @Override
            public void onMessage(ConsumerRecord<Integer, String> message) {
                logger.info("received: " + message);
                latch.countDown();
            }
    
        });
        KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
        container.setBeanName("testAuto");
        container.start();
        Thread.sleep(1000); // wait a bit for the container to start
        KafkaTemplate<Integer, String> template = createTemplate();
        template.setDefaultTopic(topic1);
        template.sendDefault(0, "foo");
        template.sendDefault(2, "bar");
        template.sendDefault(0, "baz");
        template.sendDefault(2, "qux");
        template.flush();
        assertTrue(latch.await(60, TimeUnit.SECONDS));
        container.stop();
        logger.info("Stop auto");
    
    }
    private KafkaMessageListenerContainer<Integer, String> createContainer(
                            ContainerProperties containerProps) {
        Map<String, Object> props = consumerProps();
        DefaultKafkaConsumerFactory<Integer, String> cf =
                                new DefaultKafkaConsumerFactory<Integer, String>(props);
        KafkaMessageListenerContainer<Integer, String> container =
                                new KafkaMessageListenerContainer<>(cf, containerProps);
        return container;
    }
    
    private KafkaTemplate<Integer, String> createTemplate() {
        Map<String, Object> senderProps = senderProps();
        ProducerFactory<Integer, String> pf =
                  new DefaultKafkaProducerFactory<Integer, String>(senderProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        return template;
    }
    
    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    
    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    
    3.1.3. With Java Configuration

    You can do the same work as appears in the previous example with Spring configuration in Java. The following example shows how to do so:

    @Autowired
    private Listener listener;
    
    @Autowired
    private KafkaTemplate<Integer, String> template;
    
    @Test
    public void testSimple() throws Exception {
        template.send("annotated1", 0, "foo");
        template.flush();
        assertTrue(this.listener.latch1.await(10, TimeUnit.SECONDS));
    }
    
    @Configuration
    @EnableKafka
    public class Config {
    
        @Bean
        ConcurrentKafkaListenerContainerFactory<Integer, String>
                            kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    
        @Bean
        public ConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
            ...
            return props;
        }
    
        @Bean
        public Listener listener() {
            return new Listener();
        }
    
        @Bean
        public ProducerFactory<Integer, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @Bean
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
            ...
            return props;
        }
    
        @Bean
        public KafkaTemplate<Integer, String> kafkaTemplate() {
            return new KafkaTemplate<Integer, String>(producerFactory());
        }
    
    }
    public class Listener {
    
        private final CountDownLatch latch1 = new CountDownLatch(1);
    
        @KafkaListener(id = "foo", topics = "annotated1")
        public void listen1(String foo) {
            this.latch1.countDown();
        }
    
    }
    
    3.1.4. Even Quicker, with Spring Boot

    Spring Boot can make things even simpler. The following Spring Boot application sends three messages to a topic, receives them, and stops:

    @SpringBootApplication
    public class Application implements CommandLineRunner {
    
        public static Logger logger = LoggerFactory.getLogger(Application.class);
    
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args).close();
        }
    
        @Autowired
        private KafkaTemplate<String, String> template;
    
        private final CountDownLatch latch = new CountDownLatch(3);
    
        @Override
        public void run(String... args) throws Exception {
            this.template.send("myTopic", "foo1");
            this.template.send("myTopic", "foo2");
            this.template.send("myTopic", "foo3");
            latch.await(60, TimeUnit.SECONDS);
            logger.info("All received");
        }
    
        @KafkaListener(topics = "myTopic")
        public void listen(ConsumerRecord<?, ?> cr) throws Exception {
            logger.info(cr.toString());
            latch.countDown();
        }
    
    }
    

    Boot takes care of most of the configuration. When we use a local broker, the only properties we need are the following:

    Example 1. application.properties
    spring.kafka.consumer.group-id=foo
    spring.kafka.consumer.auto-offset-reset=earliest
    

    We need the first property because we are using group management to assign topic partitions to consumers, so we need a group.
    The second property ensures the new consumer group gets the messages we sent, because the container might start after the sends have completed.
    https://docs.spring.io/spring-kafka/reference/html/#events

  • 相关阅读:
    JavaScript 闭包
    JavaScript Ajax
    JQuery简介
    NYOJ--491--dfs(打表水过)--幸运三角形
    素数环:NYOJ--488--dfs||hdu-1016-Prime Ring Problem
    NYOJ--353--bfs+优先队列--3D dungeon
    NYOJ--325--深度优先搜索--zb的生日
    NYOJ--202--红黑树
    第一个Android程序
    Vmware虚拟机安装win7系统教程
  • 原文地址:https://www.cnblogs.com/xidianzxm/p/10735683.html
Copyright © 2011-2022 走看看