zoukankan      html  css  js  c++  java
  • 5、Spring-Kafka3

    3. Introduction

    This first part of the reference documentation is a high-level overview of Spring for Apache Kafka and the underlying concepts and some code snippets that can help you get up and running as quickly as possible.

    3.1. Quick Tour for the Impatient

    This is the five-minute tour to get started with Spring Kafka.

    Prerequisites: You must install and run Apache Kafka. Then you must grab the spring-kafka JAR and all of its dependencies. The easiest way to do that is to declare a dependency in your build tool. The following example shows how to do so with Maven:


    The following example shows how to do so with Gradle:

    compile 'org.springframework.kafka:spring-kafka:2.2.5.RELEASE'

    3.1.1. Compatibility

    This quick tour works with the following versions:

    Apache Kafka Clients 2.0.0

    Spring Framework 5.1.x

    Minimum Java version: 8

    3.1.2. A Very, Very Quick Example

    As the following example shows, you can use plain Java to send and receive a message:

    public void testAutoCommit() throws Exception {
        logger.info("Start auto");
        ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
        final CountDownLatch latch = new CountDownLatch(4);
        containerProps.setMessageListener(new MessageListener<Integer, String>() {
            public void onMessage(ConsumerRecord<Integer, String> message) {
                logger.info("received: " + message);
        KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
        Thread.sleep(1000); // wait a bit for the container to start
        KafkaTemplate<Integer, String> template = createTemplate();
        template.sendDefault(0, "foo");
        template.sendDefault(2, "bar");
        template.sendDefault(0, "baz");
        template.sendDefault(2, "qux");
        assertTrue(latch.await(60, TimeUnit.SECONDS));
        logger.info("Stop auto");
    private KafkaMessageListenerContainer<Integer, String> createContainer(
                            ContainerProperties containerProps) {
        Map<String, Object> props = consumerProps();
        DefaultKafkaConsumerFactory<Integer, String> cf =
                                new DefaultKafkaConsumerFactory<Integer, String>(props);
        KafkaMessageListenerContainer<Integer, String> container =
                                new KafkaMessageListenerContainer<>(cf, containerProps);
        return container;
    private KafkaTemplate<Integer, String> createTemplate() {
        Map<String, Object> senderProps = senderProps();
        ProducerFactory<Integer, String> pf =
                  new DefaultKafkaProducerFactory<Integer, String>(senderProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        return template;
    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    3.1.3. With Java Configuration

    You can do the same work as appears in the previous example with Spring configuration in Java. The following example shows how to do so:

    private Listener listener;
    private KafkaTemplate<Integer, String> template;
    public void testSimple() throws Exception {
        template.send("annotated1", 0, "foo");
        assertTrue(this.listener.latch1.await(10, TimeUnit.SECONDS));
    public class Config {
        ConcurrentKafkaListenerContainerFactory<Integer, String>
                            kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                    new ConcurrentKafkaListenerContainerFactory<>();
            return factory;
        public ConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
            return props;
        public Listener listener() {
            return new Listener();
        public ProducerFactory<Integer, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
            return props;
        public KafkaTemplate<Integer, String> kafkaTemplate() {
            return new KafkaTemplate<Integer, String>(producerFactory());
    public class Listener {
        private final CountDownLatch latch1 = new CountDownLatch(1);
        @KafkaListener(id = "foo", topics = "annotated1")
        public void listen1(String foo) {
    3.1.4. Even Quicker, with Spring Boot

    Spring Boot can make things even simpler. The following Spring Boot application sends three messages to a topic, receives them, and stops:

    public class Application implements CommandLineRunner {
        public static Logger logger = LoggerFactory.getLogger(Application.class);
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args).close();
        private KafkaTemplate<String, String> template;
        private final CountDownLatch latch = new CountDownLatch(3);
        public void run(String... args) throws Exception {
            this.template.send("myTopic", "foo1");
            this.template.send("myTopic", "foo2");
            this.template.send("myTopic", "foo3");
            latch.await(60, TimeUnit.SECONDS);
            logger.info("All received");
        @KafkaListener(topics = "myTopic")
        public void listen(ConsumerRecord<?, ?> cr) throws Exception {

    Boot takes care of most of the configuration. When we use a local broker, the only properties we need are the following:

    Example 1. application.properties

    We need the first property because we are using group management to assign topic partitions to consumers, so we need a group.
    The second property ensures the new consumer group gets the messages we sent, because the container might start after the sends have completed.

  • 相关阅读:
    JavaScript 闭包
    JavaScript Ajax
    素数环:NYOJ--488--dfs||hdu-1016-Prime Ring Problem
    NYOJ--353--bfs+优先队列--3D dungeon
  • 原文地址:https://www.cnblogs.com/xidianzxm/p/10735683.html
Copyright © 2011-2022 走看看