zoukankan      html  css  js  c++  java
  • docker-compose 搭建 kafka

    1. 配置:

    version: '3.2'
    services:
      zookeeper:
        image: wurstmeister/zookeeper
        container_name: zookeeper
        ports:
          - "2181:2181"
        restart: always
      kafka:
        image: wurstmeister/kafka
        container_name: kafka
        ports:
          - "9092:9092"
        environment:
          - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
          - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://服务器IP:9092
          - KAFKA_LISTENERS=PLAINTEXT://:9092
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
        restart: always
    

    2. 测试:

    通过容器名称进入到kafka容器中:

    docker exec -it kafka /bin/bash
    

    创建一个名称为test的topic:

    kafka-topics.sh --create --topic test 
    --zookeeper zookeeper:2181 --replication-factor 1 
    --partitions 1
    

    查看刚刚创建的topic信息:

    kafka-topics.sh --zookeeper zookeeper:2181 
    --describe --topic test
    

    打开生产者发送若干条消息:

    kafka-console-producer.sh --topic=test 
    --broker-list kafka:9092
    

    开发消费者接收消息:

    kafka-console-consumer.sh 
    --bootstrap-server kafka:9092 
    --from-beginning --topic test
    

    如果可以成功接收到消息,则说明kafka已经启动成功了,可以进行本地开发以及调试工作了。

    3. SpringBoot集成

    SpringBoot 1.5.x

    依赖

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>1.2.1.RELEASE</version>
    </dependency>
    

    KafkaProducerConfig.java

    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    
    @Configuration
    @EnableKafka
    public class KafkaProducerConfig {
    
        public Map<String, Object> producerConfig(){
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.143.47.32:9092");
            props.put(ProducerConfig.RETRIES_CONFIG, 0);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000);
            props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
        
        public ProducerFactory<String, String> producerFactory(){
            return new DefaultKafkaProducerFactory<>(producerConfig());
        }
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<String, String>(producerFactory());
        }
    }
    

    KafkaConsumerConfig.java

    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    
    import com.yue.practise.consumer.MsgConsumer;
    
    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
    
        public Map<String, Object> consumerConfig(String consumerGroupId){
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.143.47.32:9092");
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return props;
        }
        public ConsumerFactory<String, String> consumerFactory(String consumerGroupId) {
            return new DefaultKafkaConsumerFactory<>(consumerConfig(consumerGroupId));
        }
        
         
         
         @Bean(name="kafkaListenerContainerFactory")
         public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
              ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
              factory.setConsumerFactory(consumerFactory("groupA"));
              factory.setConcurrency(3);
              factory.getContainerProperties().setPollTimeout(3000);
              return factory;
         }
         
         @Bean(name="kafkaListenerContainerFactory1")
         public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory1() {
              ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
              factory.setConsumerFactory(consumerFactory("groupB"));
              factory.setConcurrency(3);
              factory.getContainerProperties().setPollTimeout(3000);
              return factory;
         }
    }
    

    MsgProducer.java

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MsgProducer {
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
        
        public void send(String value){
            System.out.println("send start-----------");
            kafkaTemplate.send("test", value+"1");
            kafkaTemplate.send("test", "key1", value+"2");
            System.out.println("send end-----------");
        }
    }
    

    MsgConsumer.java

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    
    @Component
    public class MsgConsumer {
        static Logger subscribelogger = LoggerFactory.getLogger("subscribelogger");
        
        @KafkaListener(topics="test",containerFactory="kafkaListenerContainerFactory")
        public void processMsg(ConsumerRecord<?, ?> record){
            subscribelogger.info("{}|{}|{}|{}",record.topic(),record.partition(),record.offset(),record.value());    
        }
        
        @KafkaListener(topics="test",containerFactory="kafkaListenerContainerFactory1")
        public void processMsg1(ConsumerRecord<?, ?> record){
            subscribelogger.info("{}|{}|{}|{}",record.topic(),record.partition(),record.offset(),record.value());    
        }
        
    }
    

    参考:

    作者:林宇风
    版权所有,侵权必究!标明出处,欢迎转载。
  • 相关阅读:
    Linux下Tomcat服务器-maven项目部署
    数据库设计感悟
    数据库设计规范
    从零到一: 代码调试
    Java泛型与反射的综合应用
    Eclipse中,tomcat插件方式启动项目
    集合查询表--Map
    集合线性表--List之LinkedList(队列与栈)
    集合线性表--List之ArrayList
    Java中的日期操作
  • 原文地址:https://www.cnblogs.com/linyufeng/p/14685648.html
Copyright © 2011-2022 走看看