zoukankan      html  css  js  c++  java
  • Docker Compose安装kafka和springboot整合kafka

    1Docker Compose

    1.1运行以下命令以下载Docker Compose的当前稳定版本:

    sudo curl -L "https://github.com/docker/compose/releases/download/1.27.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

    1.2将可执行权限应用于二进制文件:

    sudo chmod +x /usr/local/bin/docker-compose

    1.3卸载

    如果使用curl以下命令卸载Docker Compose 

    sudo rm /usr/local/bin/docker-compose

     2单机版kafka:

     2.1准备镜像文件:zookeeper和kafka

    使用docker-compose管理容器

    docker-compose-testkafka.yml

    version: "3.3"
    services:
      zookeeper:
          image: zookeeper:3.5.5
          restart: always
          container_name: zookeeper
          ports:
              - "2181:2181"
          expose:
              - "2181"
          environment:
              - ZOO_MY_ID=1
      kafka:
          image: wurstmeister/kafka
          restart: always
          container_name: kafka
          environment:
              - KAFKA_BROKER_ID=1
              - KAFKA_LISTENERS=PLAINTEXT://kafka:9090
              - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
              - KAFKA_MESSAGE_MAX_BYTES=2000000
          ports:
              - "9090:9090"
          depends_on:
              - zookeeper
      kafka-manager:  
        image: sheepkiller/kafka-manager                ## 镜像:开源的web管理kafka集群的界面
        environment:
            ZK_HOSTS: 192.168.228.128                   ## 修改:宿主机IP
        ports:  
          - "9001:9000"                                 ## 暴露端口
        

    2.2添加docker自身ip为信任,不然kafka无法连接zookeeper

    firewall-cmd  --zone=trusted --add-source=172.18.0.1/16 --permanent
    firewall-cmd  --reload

    2.3启动集群

    docker-compose -f docker-compose.yml up -d

    2.4查看启动日志:

    docker logs -f zookeeper   

    停止集群

    docker-compose -f docker-compose*.yml stop

    2.5 sheepkiller/kafka-manager 管理kafka集群

    让我们查看一下,Zookeeper 中注册的 Broker 信息

    docker exec -it zookeeper bash bin/zkCli.sh

    退出--- [zk: localhost:2181(CONNECTED) 2] quit

    2.6进入kafka容器,操作命令

    docker exec -ti kafka bash
    #cd opt/kafa_<版本>/bin
    /opt/kafka_2.13-2.6.0/bin
    //创建topic
    kafka-topics.sh --create --zookeeper 192.168.228.128:2181 --replication-factor 1 --partitions 1 --topic topicTEST
    
    //查看topic
    kafka-topics.sh --list --zookeeper 192.168.228.128:2181
    
    //创建生产者
    kafka-console-producer.sh --broker-list 192.168.228.128:9090 --topic topicTEST
    
    //创建消费者 
    kafka-console-consumer.sh --bootstrap-server 192.168.228.128:9090 --topic topicTEST --from-beginning

    2.7生产消息 消费消息

     

    3 springboot整合kafka

    3.1配置文件

    spring:
      kafka:
        bootstrap-servers: 192.168.228.128:9090
        listener:
          concurrency: 10
          ack-mode: MANUAL_IMMEDIATE
          poll-timeout: 1500
        consumer:
          group-id: kafka_cluster1
          enable-auto-commit: false
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          properties: {session.timeout.ms: 6000, auto.offset.reset: earliest}
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          batch-size: 65536
          buffer-memory: 524288

    3.2生产者ProducerController 

    public class ProducerController {
        @Autowired
        private KafkaTemplate<String,Object> kafkaTemplate;
        @RequestMapping(value="message/send" , method= RequestMethod.POST,produces = { "application/json" })
        public String send(@RequestBody String message) {
            System.out.println(message);
            kafkaTemplate.send("topicTEST", message); //使用kafka模板发送信息
            return "success";
        }
    }

    3.3消费者ConsumerDemo 

    @Component
    public class ConsumerDemo {
        @KafkaListener(topics = "topicTEST")
        public void listen (ConsumerRecord<?, ?> record){
            System.out.printf("topic is %s, offset is %d, value is %s 
    ", record.topic(), record.offset(), record.value());
        }
    }

     3.4kafkaTemplate发送消息的3种方式

     public void testTemplateSend() {
            //1 发送带有时间戳的消息
            kafkaTemplate.send("topicTEST", 0, System.currentTimeMillis(), String.valueOf(0), "send message with timestamp");
            //2 使用ProducerRecord发送消息
            ProducerRecord record = new ProducerRecord("topicTEST", "use ProducerRecord to send message");
            kafkaTemplate.send(record);
    
            //3 使用Spring框架Message类发送消息
            Map map = new HashMap();
            map.put(KafkaHeaders.TOPIC, "topicTEST");
            map.put(KafkaHeaders.PARTITION_ID, 0);
            map.put(KafkaHeaders.MESSAGE_KEY, String.valueOf(0));
            GenericMessage message = new GenericMessage("use Message to send message",new MessageHeaders(map));
            kafkaTemplate.send(message);
        }
     
  • 相关阅读:
    js中当call或者apply传入的第一个参数是null/undefined时,js函数内执行的上下文环境是什么?
    闭包的实现原理和作用以及堆栈溢出和内存泄漏原理和相应解决办法
    JavaScript的作用域和作用域链
    词法作用域和动态作用域
    理解 es6 中class构造以及继承的底层实现原理
    new一个对象的详细过程,手动实现一个 new操作符
    实现继承的几种方式以及他们的优缺点
    理解JavaScript的执行上下文栈,可以应用堆栈信息快速定位问题
    原型和原型链-instanceof的底层实现原理
    js判断变量未定义
  • 原文地址:https://www.cnblogs.com/Lambquan/p/13649715.html
Copyright © 2011-2022 走看看