zoukankan      html  css  js  c++  java
  • kafka 学习(五)spring boot + kafka + zookeeper

    spring boot + kafka + zookeeper

    环境搭建见上一篇

    pom文件添加依赖

    <!-- kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    application.yml 配置如下(简单测试足够)

    spring: 
     kafka:
        bootstrap-servers: 10.250.23.213:9092,10.250.23.214:9092,10.250.23.215:9092
        consumer:
          auto-offset-reset: earliest
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          group-id: test-consumer-group
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          batch-size: 65536
          buffer-memory: 524288
    TestKafkaController.java
    package com.jmev.cn.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    
    /**
     * @Author: shaoxin
     * @Date: 2019/3/29 9:28
     * @Email:airvicii@163.com
     */
    @RestController
    @RequestMapping("/kafka")
    public class TestKafkaController {
    
        /**
         * 注入kafkaTemplate
         */
        @Resource
        private KafkaTemplate<String, String> kafkaTemplate;
    
        /**
         * 发送消息的方法
         *
         * @param key  推送数据的key
         * @param data 推送数据的data
         */
        private void send(String key, String data) {
            // topic 名称 key data 消息数据
            kafkaTemplate.send("kafka_test", key, data);
    
        }
    
        // test 主题 1 my_test 3
    
        @RequestMapping("/send")
        public String testKafka() {
            int iMax = 6;
            for (int i = 1; i < iMax; i++) {
                send("key" + i, "data" + i);
            }
            return "success";
        }
    
        /**
         * 消费者使用日志打印消息
         */
        @KafkaListener(topics = "kafka_test")
        public void receive(ConsumerRecord<?, ?> consumer) {
            System.out.println("topic名称:" + consumer.topic()
                    + ",key:" + consumer.key() + ",分区位置:" + consumer.partition()
                    + ", 下标" + consumer.offset());
        }
    }
  • 相关阅读:
    IOS tableView的数据刷新
    IOS Modal(切换另外控件器方式)
    IOS UITabBarController(控制器)的子控制器
    iOS 应用数据存储的常用方式
    IOS 获取文本焦点 主动召唤出键盘(becomeFirstResponder) and 失去焦点(退下键盘)
    集合类型
    提取URL的搜索字符串中的参数
    本地对象、内置对象、宿主对象
    声明函数 执行上下文 匿名函数
    完善tab页面定位
  • 原文地址:https://www.cnblogs.com/ShaoXin/p/10621260.html
Copyright © 2011-2022 走看看