zoukankan      html  css  js  c++  java
  • docker安装kafka、springboot整合kafka demo

    1. docker安装启动 kafka

    #下载镜像
    docker pull zookeeper
    docker pull wurstmeister/kafka
    #查看镜像
    docker images
    #启动zookeeper
     docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime zookeeper:latest   
     #启动kafka ,服务器是试用的阿里云的 这里就不隐藏了
    docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=47.99.68.32:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://47.99.68.32:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
    #进入kafka目录, ctrl+p+q 退出kafka容器,但不关闭容器
    docker exec -ti kafka /bin/bash
    #  进入到指定目录。  kafka_2.12-2.2.0 该目录的版本号可能不一致 自行修改不要直接复制
    cd /opt/kafka_2.12-2.2.0/bin
    #创建topic localhost 可能需要换成ip
    kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
    #查看topic
    kafka-topics.sh --list --zookeeper localhost:2181
    
    

    2. java demo:

    完整demo git仓库

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 监听服务器上的kafka是否有相关的消息发过来
     */
    @Component
    public class ConsumerDemo {
        /**
         * 定义此消费者接收topics = "demo"的消息,与controller中的topic对应上即可
         * @param record 变量代表消息本身,可以通过ConsumerRecord<?,?>类型的record变量来打印接收的消息的各种信息
         */
        @KafkaListener(topics = "demo")
        public void listen (ConsumerRecord<?, ?> record){
            System.out.printf("topic is %s, offset is %d, value is %s 
    ", record.topic(), record.offset(), record.value());
        }
    }
    -------------------------------------------------------------------------------
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class ProducerController {
        @Autowired
        private KafkaTemplate<String,Object> kafkaTemplate;
        @RequestMapping("message/send")
        public String send(String msg){
            kafkaTemplate.send("demo", msg); //使用kafka模板发送信息
            return "success";
        }
    }
    
    
    spring:
      kafka:
        bootstrap-servers: 47.99.68.32:9092 #指定kafka server的地址,集群配多个,中间,逗号隔开
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          group-id: default_consumer_group #群组ID
          enable-auto-commit: true
          auto-commit-interval: 1000
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    server:
      port: 8500
    
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.kafka</groupId>
    			<artifactId>spring-kafka</artifactId>
    		</dependency>
    
  • 相关阅读:
    小白学docker(1)---docker安装
    反射与动态代理
    SpringBoot源码分析(1)—启动类
    maven中的groupId和artifactld到底指的什么?
    SpringBoot配置文件加载顺序
    HashMap和ConcurrentHashMap
    架构设计:系统间通信(4)——IO通信模型和JAVA实践 中篇
    架构设计:系统间通信(3)——IO通信模型和JAVA实践 上篇
    架构设计:系统间通信(2)——概述从“聊天”开始下篇
    idea选择指定版本进行安装
  • 原文地址:https://www.cnblogs.com/paidaxing7090/p/14204439.html
Copyright © 2011-2022 走看看