zoukankan      html  css  js  c++  java
  • Docker部署Kafka以及Spring Kafka操作

    https://hub.docker.com/ 查找kafka

    第三个活跃并stars数量多 进去看看使用

    我们使用docker-compose来构建镜像

    查看使用文档中的docker-compose.yml

    因为kafka要搭配zookeeper一起使用,所以文档中包含了zookeeper

    我修改了一下版本号 以及变量参数 

    version: '3'
    services:
      zookeeper:
        image: wurstmeister/zookeeper
        ports:
          - "2181:2181"
      kafka:
        image: wurstmeister/kafka
        depends_on: [ zookeeper ]
        ports:
          - "9092:9092"
        environment:
          KAFKA_BROKER_ID: 0
          KAFKA_ZOOKEEPER_CONNECT: 192.168.17.165:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.17.165:9092
          KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
        volumes:
          - /data/product/zj_bigdata/data/kafka/docker.sock:/var/run/docker.sock
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://xxx.xxx.xxx.xxx:port (局域网宿主机的IP地址而非容器的IP,及暴露出来的端口)

       运行命令docker-compose up -d  ,就会开启2个容器 

    进kafka容器使用官网的指南进行测验  http://kafka.apache.org/quickstart

    docker exec -it   {容器id&name} /bin/bash

    进入容器之后进入命令目录 cd /opt/kafka/bin/

      

    第一步:新建topic

    kafka-topics.sh --create --bootstrap-server 192.168.17.165:9092 --replication-factor 1 --partitions 1 --topic mytest    (localhost修改为自己的ip号)

    第二步: 发送信息: kafka-console-producer.sh --broker-list 192.168.17.165:9092 --topic mytest

     第三步:另开启一个窗口消费信息: kafka-console-consumer.sh --bootstrap-server 192.168.17.165:9092 --topic mytest --from-beginning

     则表示已经搭建成功了.

    开始搭建Spring Kafka

    pom.xml依赖:

    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
        </dependencies>

     配置生产者Config:

    /**
     * @author 思凡
     * @version 1.0
     * @date 2019/10/24 18:14
     * Kafka生产者配置
     */
    @EnableKafka
    @Configuration
    public class KafkaProducerConfig {
        @Bean
        public ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
        @Bean
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.17.165:9092");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            // See https://kafka.apache.org/documentation/#producerconfigs for more properties
            return props;
        }
        @Bean
        public KafkaTemplate<String , String> kafkaTemplate() {
            return new KafkaTemplate<String, String>(producerFactory());
        }
    }

    配置消费者Config

    /**
     * @author 思凡
     * @version 1.0
     * @date 2019/10/24 18:22
     * 消费者配置
     */
    @EnableKafka
    @Configuration
    public class KafkaConsumerConfig {
        @Bean
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
        kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(3);
            factory.getContainerProperties().setPollTimeout(3000);
            return factory;
        }
    
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.17.165:9092");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return props;
        }
    }

     相关配置来自spring官网kafka文档:   https://docs.spring.io/spring-kafka/docs/2.3.1.RELEASE/reference/html/

    创建Controller请求后发送信息:

    /**
     * @author 思凡
     * @version 1.0
     * @date 2019/10/23 21:48
     */
    @RestController
    @Slf4j
    public class KafkaTestController {
        @Autowired
        private  KafkaTemplate<String, String> kafkaTemplate;
    
        @GetMapping("/test")
        public String test(){
            log.info("-------------发送信息----------------");
            kafkaTemplate.send("mytest","hello kafka");
            return null;
        }
    }
    

      

    配置监听:  

    使用@KafkaListener注解 ,文档都有相关说明,就不一一解释

    /**
     * @author 思凡
     * @version 1.0
     * @date 2019/10/24 13:13
     */
    @EnableKafka
    @Configuration
    @Slf4j
    public class KafkaListenerConfig {
        @KafkaListener(topics = {"mytest"},groupId = "mytest")
        public void lisen(String message){
            log.info("-------------消费信息----------------");
            log.info(message);
        }
    }

    run项目,随后请求url 查看日志:

     就配置成功了.

  • 相关阅读:
    Intellij Idea 2017创建web项目及tomcat部署实战
    使用docker安装mysql服务
    python2--升级python3
    SpringCloud--注册中心Eureka
    SpringBoot--属性加载顺序
    Jmeter--压测dubbo接口
    较快的maven的settings.xml文件
    Spring boot:logback文件配置
    Spring--AOP
    34组代码敲不队记账类app会议纪要
  • 原文地址:https://www.cnblogs.com/zsifan/p/11737261.html
Copyright © 2011-2022 走看看