zoukankan      html  css  js  c++  java
  • springboot之kafka安装与实践

    环境:腾讯云centos7

    1、下载

    http://mirror.bit.edu.cn/apache/kafka/2.3.0/kafka_2.11-2.3.0.tgz

    2、解压

    tar -xvf kafka_2.11-2.3.0.tgz
    mv kafka_2.11-2.3.0 /usr/java/kafka2.11
    cd /usr/java/kafka2.11

    3、启动与测试

    (a)zookeeper启动
            bin/zookeeper-server-start.sh config/zookeeper.properties
        (b)kafka服务端启动
            bin/kafka-server-start.sh config/server.properties 
        (c)列出topic
            bin/kafka-topics.sh --zookeeper localhost:2181 --list
        (d)创建topic
            bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Demo1
        (e)描述Topic
            bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Demo1
        (f)发布消息到指定的Topic
            bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Demo1
        (g)消费指定Topic上的消息
            (已过时,老版本使用,否则报zookeeper is not a recognized option)
            bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic Demo1
            
            bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Demo1 --from-beginning

    4、安装kafka web界面

      a)下载地址:https://github.com/quantifind/KafkaOffsetMonitor/releases/download/v0.2.1/KafkaOffsetMonitor-assembly-0.2.1.jar

      b) 运行

      mkdir /mydata/kafkamonitorlogs
    
        java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.1.jar 
         com.quantifind.kafka.offsetapp.OffsetGetterWeb 
         --zk 132.232.44.82:2181 
         --port 8787 
         --refresh 10.seconds 
         --retain 7.days 1>/mydata/kafkamonitorlogs/stdout.log 2>/mydata/kafkamonitorlogs/stderr.log &

      c) web访问  

    http://ip:8787

    本人虚拟机内存太小了,所以无法查看到消息列表,但是web界面确实可以用!

    完毕!

     ########springboot集成实践###########

    1、pom.xml添加依赖

       <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
       </dependency>

    2、yml文件添加配置

    spring:
      profiles:
        active: @activatedProperties@
      kafka:
        bootstrap-servers: 132.232.44.82:9092
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          group-id: test
          enable-auto-commit: true
          auto-commit-interval: 1000
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    3、在Kafka的config/server.properties文件中添加

    advertised.listeners=PLAINTEXT://132.232.44.89:9092

    4、KafkaConsumer.java

    package com.cn.commodity.controller;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    /**
     * kafka消费者测试
     */
    @Component
    public class KafkaConsumer {
        @KafkaListener(topics = "test_topic1")
        public void listen (ConsumerRecord<?, ?> record) throws Exception {
            System.out.printf("topic = %s, offset = %d, value = %s 
    ", record.topic(), record.offset(), record.value());
        }
    }

    5、KafkaProducer.java

    package com.cn.commodity.controller;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * 测试kafka生产者
     */
    @RestController
    @RequestMapping("kafka")
    public class KafkaProducer {
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @RequestMapping("send")
        public String send(String msg){
            kafkaTemplate.send("test_topic1", msg);
            return "success";
        }
    
    }

    启动运行,完毕!

  • 相关阅读:
    SQLSTATE[42000]: Syntax error or access violation: 1071 Specified key was too long 解决方法
    Apache Commons 简介
    CSS设置只显示两行文字
    HTML中关于动态创建的标签无法绑定js事件的解决方法:.on()方法的 [.selector]
    AISing Programming Contest 2021(AtCoder Beginner Contest 202)E
    CF620E New Year Tree(dfs序+线段树)
    HDU6955 2021多校 Xor sum(字典树+前缀和异或)
    HDU6959 2021多校 zoto(莫队+分块)
    CF1285D Dr. Evil Underscores(分治)
    CF706D Vasiliy's Multiset(字典树的删除)
  • 原文地址:https://www.cnblogs.com/ywjfx/p/11312423.html
Copyright © 2011-2022 走看看