zoukankan      html  css  js  c++  java
  • 物联网架构成长之路(49)-SpringBoot集成KafKa中间件

    0.前言

      今天(2020-02-24)是开工的第一天,来到公司后,服务器出现问题,网管正在处理。没有服务器的后端,就像没有武器的剑客。没办法进行开发,就看看资料学习一点技术。
      疫情期间,虽然没有上班,但是自己的物联网平台还是在慢慢的优化中。下面这个图是规划后的V2版本架构图。
      架构图里面用到Kafka中间件,是作为数据流来处理。由于MQTT(EMQ)无法进行数据的持久化,所以需要引入Kafka来实现处理。EMQ用来保证通信的实时性和高效性。Kafka利用消息队列特性用来进行非实时与离线处理。
      比如架构图所示,可以利用EMQ的Kafka插件或者订阅MQTT根Topic的方式,把通信内容按照规则发往Kafka,作为生产者。而后面的支付服务,离线大数据处理服务,数据存储服务等,作为消费者。

    1. 利用Docker-Compose搭建kafka

      docker-compose.yml

     1 version: '3'
     2 services:
     3     zookeeper:
     4         image: wurstmeister/zookeeper
     5         ports:
     6             - "2181:2181"
     7     kafka:
     8         image: "wurstmeister/kafka:2.12-2.4.0"
     9         ports:
    10             - "9092:9092"
    11         environment:
    12             KAFKA_ADVERTISED_HOST_NAME: 192.168.0.106
    13             KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    14         volumes:
    15             - /var/run/docker.sock:/var/run/docker.sock
    16             - /root/workspace/kafka/data:/kafka
    17             - /etc/localtime:/etc/localtime
    18     kafka-manager:
    19         image: "sheepkiller/kafka-manager"
    20         restart: always
    21         container_name: kafka-manger
    22         ports:
    23             - "9091:9000"
    24         links:
    25             - zookeeper
    26             - kafka
    27         environment:
    28             ZK_HOSTS: zookeeper:2181
    29             KAFKA_BROKERS: kafka:9092
    30             KM_ARGS: -Djava.net.preferIPv4Stack=true
    31             KM_USERNAME: admin
    32             KM_PASSWORD: admin

      下面是一些基础操作

     1 #进入容器
     2 docker exec -it ${CONTAINER ID} /bin/bash 
     3 #进入目录
     4 cd opt/kafka_2.11-0.10.1.1
     5 #创建Topic
     6 bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test
     7 #查询Topic
     8 bin/kafka-topics.sh --list --bootstrap-server localhost:9092
     9 
    10 #运行一个生产者
    11 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    12 >This is Message
    13 
    14 #运行一个消费者
    15 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

    2. Kafka Manager界面
      访问192.168.0.106:9091,这个界面就是Kafka Manager管理界面,我们增加一个Cluster。然后可以查看对应的kafka信息

    3. SpringBoot集成Kafka

    3.1 pom.xml

    1         <dependency>
    2             <groupId>org.springframework.kafka</groupId>
    3             <artifactId>spring-kafka</artifactId>
    4         </dependency>
    5         <dependency>
    6             <groupId>com.google.code.gson</groupId>
    7             <artifactId>gson</artifactId>
    8         </dependency>

    3.2 KafkaController.java

     1 package com.wunaozai.demo.kafka;
     2 
     3 import org.springframework.beans.factory.annotation.Autowired;
     4 import org.springframework.web.bind.annotation.RequestMapping;
     5 import org.springframework.web.bind.annotation.RestController;
     6 
     7 @RestController
     8 @RequestMapping(value="/kafka")
     9 public class KafkaController {
    10 
    11     @Autowired
    12     private KafkaSender kafkaSender;
    13     
    14     @RequestMapping(value="/send")
    15     public String send(String msg) {
    16         boolean flag = kafkaSender.send(msg);
    17         return flag + "";
    18     }
    19 }

    3.3 KafkaMessage.java

     1 package com.wunaozai.demo.kafka;
     2 
     3 import java.sql.Timestamp;
     4 
     5 public class KafkaMessage {
     6     
     7     private Long id;
     8     private String msg;
     9     private Timestamp ts;
    10     public Long getId() {
    11         return id;
    12     }
    13     
    14     public void setId(Long id) {
    15         this.id = id;
    16     }
    17     
    18     public String getMsg() {
    19         return msg;
    20     }
    21     
    22     public void setMsg(String msg) {
    23         this.msg = msg;
    24     }
    25     
    26     public Timestamp getTs() {
    27         return ts;
    28     }
    29     
    30     public void setTs(Timestamp ts) {
    31         this.ts = ts;
    32     }
    33     
    34 }

    3.4 KafkaReceiver.java

     1 package com.wunaozai.demo.kafka;
     2 
     3 import java.util.Optional;
     4 
     5 import org.apache.kafka.clients.consumer.ConsumerRecord;
     6 import org.slf4j.Logger;
     7 import org.slf4j.LoggerFactory;
     8 import org.springframework.kafka.annotation.KafkaListener;
     9 import org.springframework.stereotype.Component;
    10 
    11 @Component
    12 public class KafkaReceiver {
    13 
    14     private static final Logger log = LoggerFactory.getLogger(KafkaReceiver.class);
    15 
    16     @KafkaListener(topics= {"iot"})
    17     public void listen(ConsumerRecord<?, ?> record) {
    18         Optional<?> message = Optional.ofNullable(record.value());
    19         if(message.isPresent()) {
    20             Object msg = message.get();
    21             log.info("record : " + record);
    22             log.info("message : " + msg);
    23         }
    24     }
    25 }

    3.5 KafkaSender.java

     1 package com.wunaozai.demo.kafka;
     2 
     3 import java.sql.Timestamp;
     4 
     5 import org.springframework.beans.factory.annotation.Autowired;
     6 import org.springframework.kafka.core.KafkaTemplate;
     7 import org.springframework.stereotype.Component;
     8 
     9 import com.google.gson.Gson;
    10 import com.google.gson.GsonBuilder;
    11 
    12 @Component
    13 public class KafkaSender {
    14 
    15     @Autowired
    16     private KafkaTemplate<String, String> kafkaTemplate;
    17     
    18     private Gson gson = new GsonBuilder().create();
    19     
    20     public boolean send(String msg) {
    21         KafkaMessage message = new KafkaMessage();
    22         message.setId(System.currentTimeMillis());
    23         message.setTs(new Timestamp(System.currentTimeMillis()));
    24         message.setMsg(msg); 
    25         kafkaTemplate.send("iot", gson.toJson(message));
    26         return true;
    27     }
    28 }

    3.6 application.properties

     1 #指定kafka 代理地址,可多个
     2 spring.kafka.bootstrap-servers=192.168.0.106:9092
     3 
     4 #provider
     5 spring.kafka.producer.retries=0
     6 #每次批量发送消息的数量
     7 spring.kafka.producer.batch-size=16384
     8 spring.kafka.producer.buffer-memory=33554432
     9 #指定消息key和消息体body的编解码方式
    10 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    11 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    12 
    13 #consumer
    14 # 指定默认消费者group id
    15 spring.kafka.consumer.group-id=0
    16 spring.kafka.consumer.auto-offset-reset=earliest
    17 spring.kafka.consumer.enable-auto-commit=true
    18 spring.kafka.consumer.auto-commit-interval=100
    19 # 指定消息key和消息体的编解码方式
    20 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

     运行效果图

      经过测试,通过java方式与console方式都是可以互相通信。

      假设在KafkaReceiver.java 第23行增加 Thread.sleep(10000); 用来模拟实际业务延时。类似抢票或者秒杀的应用场景。突然高峰,然后把所有订单数据都放到Kafka,然后在慢慢消费。生成实际业务订单,再通知用户付款。

      按照架构设计,平台部分会订阅EMQ的Topic,过滤部分数据或者完整数据报都发往Kafka,然后让Kafka后面的消费者根据需要自己进行消费。比如对所有 iot/product-uuid/device-uuid/device/+/property 所有属性日志相关的消息都通过Kafka的property这个Topic发送。消费者订阅property后,就可以消费。至于消费后可以存入influxdb进行持久化,也可以预处理后显示。MQTT设计中还有一类是 iot/product-uuid/device-uuid/device/+/event 事件类。这些相关的消息,会发往Kafka的event主题。然后由后面的消费者来消费event。进而进行报警等处理。


    参考资料:
      http://kafka.apachecn.org/documentation.html#operations
      https://hub.docker.com/r/wurstmeister/kafka
      https://mp.weixin.qq.com/s?__biz=MzU2NDg0OTgyMA==&mid=2247484570&idx=1&sn=1ad1c96bc7d47b88e976cbd045baf7d7

    本文地址:https://www.cnblogs.com/wunaozai/p/12358247.html
    本系列目录: https://www.cnblogs.com/wunaozai/p/8067577.html
    个人主页:https://www.wunaozai.com/

  • 相关阅读:
    codeforces C. Fixing Typos 解题报告
    codeforces B. The Fibonacci Segment 解题报告
    codeforces B. Color the Fence 解题报告
    codeforces B. Petya and Staircases 解题报告
    codeforces A. Sereja and Bottles 解题报告
    codeforces B. Levko and Permutation 解题报告
    codeforces B.Fence 解题报告
    tmp
    API 设计 POSIX File API
    分布式跟踪的一个流行标准是OpenTracing API,该标准的一个流行实现是Jaeger项目。
  • 原文地址:https://www.cnblogs.com/wunaozai/p/12358247.html
Copyright © 2011-2022 走看看