zoukankan      html  css  js  c++  java
  • Spring Boot Kafka

    1、创建集群

    http://kafka.apache.org/documentation/#quickstart

    有一句我觉得特别重要: For Kafka, a single broker is just a cluster of size one.

    1.1、命令行操作

    #解压文件
    tar -zxf kafka_2.11-1.1.0.tgz
    cd kafka_2.11-1.1.0
    
    #启动Zookeerper
    bin/zookeeper-server-start.sh config/zookeeper.properties
    
    #启动Kafka
    bin/kafka-server-start.sh config/server.properties &
    cp config/server.properties config/server-1.properties
    cp config/server.properties config/server-2.properties
    
    config/server-1.properties:
        broker.id=1
        listeners=PLAINTEXT://:9093
        log.dir=/tmp/kafka-logs-1
     
    config/server-2.properties:
        broker.id=2
        listeners=PLAINTEXT://:9094
        log.dir=/tmp/kafka-logs-2
    
    bin/kafka-server-start.sh config/server-1.properties &
    bin/kafka-server-start.sh config/server-2.properties &
    
    #创建集群
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic myTopic
    
    #查看主题
    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic myTopic

    1.2、图形化界面操作

    除了命令行以为,也可以通过kafka-manager查看

    2、Spring Boot集成Kafka

    2.1、引入Maven依赖

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    2.2、配置

    spring:
      kafka:
        bootstrap-servers: 10.123.52.76:9092,10.123.52.76:9093,10.123.52.76:9094
        consumer:
          group-id: myGroup

    2.3、收发消息

    package com.cjs.boot.message;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    public class MyListener {
    
        @KafkaListener(topics = "myTopic")
        public void processMessage2(String content) {
            log.info("【Received Message From 'myTopic'】: {}", content);
        }
    
    }
    package com.cjs.boot.controller;
    
    import com.cjs.boot.response.RespResult;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Controller;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseBody;
    import org.springframework.web.servlet.ModelAndView;
    
    @Controller
    @RequestMapping("/message")
    public class MessageController extends BaseController {
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
    
        @GetMapping("/add.html")
        public ModelAndView add() {
            return new ModelAndView("message/add");
        }
    
        @PostMapping("/send.json")
        @ResponseBody
        public RespResult send(String text) {
            ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("myTopic", String.valueOf(System.currentTimeMillis()), text);
            return RespResult.success();
        }
    
    }
    2018-05-04 12:36:59.736  INFO 7552 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.1
    2018-05-04 12:36:59.736  INFO 7552 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : c0518aa65f25317e2018-05-04 12:36:59.830  INFO 7552 --- [ntainer#0-0-C-1] com.cjs.boot.message.MyListener          : 【Received Message From 'myTopic'】: 大家好啊
    2018-05-04 12:37:24.107  INFO 7552 --- [ntainer#0-0-C-1] com.cjs.boot.message.MyListener          : 【Received Message From 'myTopic'】: 吃饭啦

    2.4、截图

  • 相关阅读:
    当重写的父类的返回值类型是子类的返回值类型的父类的时候返回值类型就可以不同 比如父类的返回值是Object 子类的返回值类型是String 同意吗?
    当重写的父类的返回值类型是子类的返回值类型的父类的时候返回值类型就可以不同 比如父类的返回值是Object 子类的返回值类型是String 同意吗
    重写后的方法与被重写的方法的返回值一样吗?
    java中,一个类实现某个接口,必须重写接口中的所有方法吗
    java方法重写返回值类型
    field
    JAVA父类引用指向子类的对象意思
    io模型
    操作系统面试题整理
    java并发问题
  • 原文地址:https://www.cnblogs.com/cjsblog/p/8990159.html
Copyright © 2011-2022 走看看