zoukankan      html  css  js  c++  java
  • 用 Docker 快速搭建 Kafka 集群

    安装 Zookeeper 和 Kafka
    首先安装docker-compose,二进制文件地址 https://share.weiyun.com/ABWUW8xZ
    将15M的docker-compose文件 放入/usr/local/bin
    chmod +x /usr/local/bin/docker-compose
    docker-compose -v
    Kafka 依赖 Zookeeper,所以我们需要在安装 Kafka 之前先拥有 Zookeeper。准备如下的 docker-compose.yaml 文件,将文件中的主机地址 192.168.1.100 替换成你自己的环境中的主机地址即可。
     
    编辑docker-compose.yaml 文件
    version: "3"
    services:
      zookeeper:
        image: zookeeper
        build:
          context: ./
        container_name: zookeeper
        ports:
          - 2181:2181
        volumes:
          - ./data/zookeeper/data:/data
          - ./data/zookeeper/datalog:/datalog
          - ./data/zookeeper/logs:/logs
        restart: always
      kafka_node_0:
        depends_on:
          - zookeeper
        build:
          context: ./
        container_name: kafka-node-0
        image: wurstmeister/kafka
        environment:
          KAFKA_BROKER_ID: 0
          KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9092
          KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
          KAFKA_NUM_PARTITIONS: 3
          KAFKA_DEFAULT_REPLICATION_FACTOR: 2
        ports:
          - 9092:9092
        volumes:
          - ./data/kafka/node_0:/kafka
        restart: unless-stopped
      kafka_node_1:
        depends_on:
          - kafka_node_0
        build:
          context: ./
        container_name: kafka-node-1
        image: wurstmeister/kafka
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9093
          KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
          KAFKA_NUM_PARTITIONS: 3
          KAFKA_DEFAULT_REPLICATION_FACTOR: 2
        ports:
          - 9093:9093
        volumes:
          - ./data/kafka/node_1:/kafka
        restart: unless-stopped
      kafka_node_2:
        depends_on:
          - kafka_node_1
        build:
          context: ./
        container_name: kafka-node-2
        image: wurstmeister/kafka
        environment:
          KAFKA_BROKER_ID: 2
          KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9094
          KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094
          KAFKA_NUM_PARTITIONS: 3
          KAFKA_DEFAULT_REPLICATION_FACTOR: 2
        ports:
          - 9094:9094
        volumes:
          - ./data/kafka/node_2:/kafka
        restart: unless-stopped

    输入 docker-compose up -d 运行脚本文件进行集群构建。等待一会儿。

    SpringBoot 集成 Kafka 集群
    创建一个全新的 SpringBoot 工程,在 build.gradle 文件中添加下列依赖
    1.在 application.properties 进行 Kafka 相关参数配置。
    spring.kafka.bootstrap-servers=192.168.1.100:9092,192.168.1.100:9093,192.168.1.100:9094
    spring.kafka.producer.retries=0
    spring.kafka.producer.batch-size=16384
    spring.kafka.producer.buffer-memory=33554432
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.consumer.auto-offset-reset=latest
    spring.kafka.consumer.enable-auto-commit=true
    spring.kafka.consumer.auto-commit-interval=100

    2.创建消息体类。

    public class Message {    private Long id;    private String message;    private Date sendAt;}

    3.创建消息发送者

    public class Sender {   
     @Autowired   
     private KafkaTemplate<String, String> kafkaTemplate;    
    public void send() {       
     Message message = new Message();       
     message.setId(System.currentTimeMillis());       
     message.setMessage(UUID.randomUUID().toString());       
     message.setSendAt(new Date());       
     log.info("message = {}", JSON.toJSONString(message));        
    kafkaTemplate.send("test", JSON.toJSONString(message));  
      }}

    4.创建消息接收者

    public class Receiver {   
     @KafkaListener(topics = {"test"}, groupId = "test")    
    public void listen(ConsumerRecord<?, ?> record) {     
       Optional<?> message = Optional.ofNullable(record.value());    
        if (message.isPresent()) {          
       log.info("receiver record = " + record);        
        log.info("receiver message = " + message.get());    
        }   
     }
    }

    5.测试消息队列

    public class QueueController {    
    @Autowired   
     private Sender sender;    
    @PostMapping("/test")    
    public void testQueue() {        
    sender.send();        
    sender.send();        
    sender.send();  
      }}

    到这里就我们就成功搭建了一个 Kafka 伪集群,并成功与 SpringBoot 进行整合

     

  • 相关阅读:
    MVVM绑定 填坑,必须在与 DependencyObject 相同的线程上创建 DependencySource
    备份一个迭代查找TreeViewItem的辅助函数
    备份一个集合分组的算法
    备份一个有的时候,可能需要把其它exe或者dll包含在主程序中....
    wpf 代码判断当前是否在设计模式,兼容没有UI线程的限制
    wpf 打开输入法、禁用输入法
    xunit vs2019扩展
    .net core使用nacos作为配置中心
    使用skywalking跟踪你的请求
    帮你做codereview:使用docker安装sonarqube并扫描你的.net core元源码
  • 原文地址:https://www.cnblogs.com/mihich/p/13589827.html
Copyright © 2011-2022 走看看