zoukankan      html  css  js  c++  java
  • Kafka的安装及与Spring Boot的集成

    • 安装JDK
    1. 下载jdk-8u202-ea-bin-b03-linux-x64-07_nov_2018.tar.gz
    2. 解压
    3. 配置
    4. $ vi /etc/profile,在最后加入下面两行
        export JAVA_HOME=/usr/local/bigdata/jdk1.8.0_202
        export PATH=$JAVA_HOME/bin:$PATH
    5. 重新登录执行 java,验证JDK配置成功
    • 安装Kafka
    1. 下载kafka_2.11-1.0.2.tgz,这里主要1.0.2这个Kafka Server的 版本需要和客户端Spring-Kafka的版本对应,具体对应关系请看https://spring.io/projects/spring-kafka
    2. 解压 tar -xzvf kafka_2.11-1.0.2.tgz
    3. 配置 vi config/server.properties,host.name 非常非常关键,否则你讲无法远程连接到Kafka
      # Hostname and port the broker will advertise to producers and consumers. If not set,
      # it uses the value for "listeners" if configured.  Otherwise, it will use the value
      # returned from java.net.InetAddress.getCanonicalHostName().
      #advertised.listeners=PLAINTEXT://your.host.name:9092
      host.name=192.168.198.128
    4. 启动 Zookeeper
      bin/zookeeper-server-start.sh config/zookeeper.properties
    5. 启动 Kafka
      bin/kafka-server-start.sh config/server.properties
    • 创建一个Topic并查看是否创建成功
      bin/kafka-topics.sh --create --zookeeper 192.168.198.128:2181 --replication-factor 1 --partitions 1 --topic test  
    • 查看Topic是否创建成功
    • bin/kafka-topics.sh --list --zookeeper 192.168.198.128:2181
    • 本地Producer及Consumer演示
    1. 创建一个本地消息消费者
      bin/kafka-console-consumer.sh --bootstrap-server 192.168.198.128:9092 --topic test --from-beginning
    2. 创建一个本地消息生产者
      bin/kafka-console-producer.sh --broker-list 192.168.198.128:9092 --topic test
    3. 在生产者端输入消息,消费者端将会打印此消息
    • 远程Consumer演示(Windows 平台)
    1. 下载kafka_2.11-1.0.2.tgz解压并进入bin/windows
    2. 创建一个远程消费者
      kafka-console-consumer.bat --bootstrap-server 192.168.198.128:9092 --topic test --from-beginning
    3. 在Linux平台生成者端输入消息,检查windows端远程消费者是否打印此消息
    • Spring Boot集成配置

        在Maven中添加依赖

        <!--kafka-->
        <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>1.3.8.RELEASE</version>
        </dependency>

        关于依赖版本及Kafka Server的版本的对应关系,查看https://spring.io/projects/spring-kafka
    • KafkaProducer.java
      @Component
      public class KafkaProducer {
      
          Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
      
          @Autowired
          private KafkaTemplate kafkaTemplate;
      
          @Scheduled(fixedRateString = "1000") //per second
          public void send(){
              logger.info("Start to send message to Kakfa with topic name 'test'");
              String message = UUID.randomUUID().toString();
              ListenableFuture future = kafkaTemplate.send("test", message);
              future.addCallback(o -> System.out.println("message sent successfully: " + message), throwable -> System.out.println("message sent failed: " + message));
          }
      }
    • KafkaConsumer.java
      @Component
      public class KafkaConsumer {
      
          @KafkaListener(topics = {"test"})
          public void receive(String message){
              System.out.println("test-message:" + message);
          }
      }
  • 相关阅读:
    go语言入门(3)运算符及流程控制
    go语言入门(2)数据类型
    go语言入门(1)
    ubuntu上软件下载慢,github下载慢
    密码基础知识(2)以RSA为例说明加密、解密、签名、验签
    让你减少焦虑的一首英文小诗
    使用脚本启动fabric时出错
    Hyperledger Fabric(5)ChainCode的编写步骤
    Hyperledger Fabric(4)链码ChainCode
    设计题专题总结
  • 原文地址:https://www.cnblogs.com/streamliu/p/10025490.html
Copyright © 2011-2022 走看看