zoukankan      html  css  js  c++  java
  • 物联网架构成长之路(49)-SpringBoot集成KafKa中间件

    0.前言

      今天(2020-02-24)是开工的第一天,来到公司后,服务器出现问题,网管正在处理。没有服务器的后端,就像没有武器的剑客。没办法进行开发,就看看资料学习一点技术。
      疫情期间,虽然没有上班,但是自己的物联网平台还是在慢慢的优化中。下面这个图是规划后的V2版本架构图。
      架构图里面用到Kafka中间件,是作为数据流来处理。由于MQTT(EMQ)无法进行数据的持久化,所以需要引入Kafka来实现处理。EMQ用来保证通信的实时性和高效性。Kafka利用消息队列特性用来进行非实时与离线处理。
      比如架构图所示,可以利用EMQ的Kafka插件或者订阅MQTT根Topic的方式,把通信内容按照规则发往Kafka,作为生产者。而后面的支付服务,离线大数据处理服务,数据存储服务等,作为消费者。

    1. 利用Docker-Compose搭建kafka

      docker-compose.yml

     1 version: '3'
     2 services:
     3     zookeeper:
     4         image: wurstmeister/zookeeper
     5         ports:
     6             - "2181:2181"
     7     kafka:
     8         image: "wurstmeister/kafka:2.12-2.4.0"
     9         ports:
    10             - "9092:9092"
    11         environment:
    12             KAFKA_ADVERTISED_HOST_NAME: 192.168.0.106
    13             KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    14         volumes:
    15             - /var/run/docker.sock:/var/run/docker.sock
    16             - /root/workspace/kafka/data:/kafka
    17             - /etc/localtime:/etc/localtime
    18     kafka-manager:
    19         image: "sheepkiller/kafka-manager"
    20         restart: always
    21         container_name: kafka-manger
    22         ports:
    23             - "9091:9000"
    24         links:
    25             - zookeeper
    26             - kafka
    27         environment:
    28             ZK_HOSTS: zookeeper:2181
    29             KAFKA_BROKERS: kafka:9092
    30             KM_ARGS: -Djava.net.preferIPv4Stack=true
    31             KM_USERNAME: admin
    32             KM_PASSWORD: admin

      下面是一些基础操作

     1 #进入容器
     2 docker exec -it ${CONTAINER ID} /bin/bash 
     3 #进入目录
     4 cd opt/kafka_2.11-0.10.1.1
     5 #创建Topic
     6 bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test
     7 #查询Topic
     8 bin/kafka-topics.sh --list --bootstrap-server localhost:9092
     9 
    10 #运行一个生产者
    11 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    12 >This is Message
    13 
    14 #运行一个消费者
    15 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

    2. Kafka Manager界面
      访问192.168.0.106:9091,这个界面就是Kafka Manager管理界面,我们增加一个Cluster。然后可以查看对应的kafka信息

    3. SpringBoot集成Kafka

    3.1 pom.xml

    1         <dependency>
    2             <groupId>org.springframework.kafka</groupId>
    3             <artifactId>spring-kafka</artifactId>
    4         </dependency>
    5         <dependency>
    6             <groupId>com.google.code.gson</groupId>
    7             <artifactId>gson</artifactId>
    8         </dependency>

    3.2 KafkaController.java

     1 package com.wunaozai.demo.kafka;
     2 
     3 import org.springframework.beans.factory.annotation.Autowired;
     4 import org.springframework.web.bind.annotation.RequestMapping;
     5 import org.springframework.web.bind.annotation.RestController;
     6 
     7 @RestController
     8 @RequestMapping(value="/kafka")
     9 public class KafkaController {
    10 
    11     @Autowired
    12     private KafkaSender kafkaSender;
    13     
    14     @RequestMapping(value="/send")
    15     public String send(String msg) {
    16         boolean flag = kafkaSender.send(msg);
    17         return flag + "";
    18     }
    19 }

    3.3 KafkaMessage.java

     1 package com.wunaozai.demo.kafka;
     2 
     3 import java.sql.Timestamp;
     4 
     5 public class KafkaMessage {
     6     
     7     private Long id;
     8     private String msg;
     9     private Timestamp ts;
    10     public Long getId() {
    11         return id;
    12     }
    13     
    14     public void setId(Long id) {
    15         this.id = id;
    16     }
    17     
    18     public String getMsg() {
    19         return msg;
    20     }
    21     
    22     public void setMsg(String msg) {
    23         this.msg = msg;
    24     }
    25     
    26     public Timestamp getTs() {
    27         return ts;
    28     }
    29     
    30     public void setTs(Timestamp ts) {
    31         this.ts = ts;
    32     }
    33     
    34 }

    3.4 KafkaReceiver.java

     1 package com.wunaozai.demo.kafka;
     2 
     3 import java.util.Optional;
     4 
     5 import org.apache.kafka.clients.consumer.ConsumerRecord;
     6 import org.slf4j.Logger;
     7 import org.slf4j.LoggerFactory;
     8 import org.springframework.kafka.annotation.KafkaListener;
     9 import org.springframework.stereotype.Component;
    10 
    11 @Component
    12 public class KafkaReceiver {
    13 
    14     private static final Logger log = LoggerFactory.getLogger(KafkaReceiver.class);
    15 
    16     @KafkaListener(topics= {"iot"})
    17     public void listen(ConsumerRecord<?, ?> record) {
    18         Optional<?> message = Optional.ofNullable(record.value());
    19         if(message.isPresent()) {
    20             Object msg = message.get();
    21             log.info("record : " + record);
    22             log.info("message : " + msg);
    23         }
    24     }
    25 }

    3.5 KafkaSender.java

     1 package com.wunaozai.demo.kafka;
     2 
     3 import java.sql.Timestamp;
     4 
     5 import org.springframework.beans.factory.annotation.Autowired;
     6 import org.springframework.kafka.core.KafkaTemplate;
     7 import org.springframework.stereotype.Component;
     8 
     9 import com.google.gson.Gson;
    10 import com.google.gson.GsonBuilder;
    11 
    12 @Component
    13 public class KafkaSender {
    14 
    15     @Autowired
    16     private KafkaTemplate<String, String> kafkaTemplate;
    17     
    18     private Gson gson = new GsonBuilder().create();
    19     
    20     public boolean send(String msg) {
    21         KafkaMessage message = new KafkaMessage();
    22         message.setId(System.currentTimeMillis());
    23         message.setTs(new Timestamp(System.currentTimeMillis()));
    24         message.setMsg(msg); 
    25         kafkaTemplate.send("iot", gson.toJson(message));
    26         return true;
    27     }
    28 }

    3.6 application.properties

     1 #指定kafka 代理地址,可多个
     2 spring.kafka.bootstrap-servers=192.168.0.106:9092
     3 
     4 #provider
     5 spring.kafka.producer.retries=0
     6 #每次批量发送消息的数量
     7 spring.kafka.producer.batch-size=16384
     8 spring.kafka.producer.buffer-memory=33554432
     9 #指定消息key和消息体body的编解码方式
    10 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    11 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    12 
    13 #consumer
    14 # 指定默认消费者group id
    15 spring.kafka.consumer.group-id=0
    16 spring.kafka.consumer.auto-offset-reset=earliest
    17 spring.kafka.consumer.enable-auto-commit=true
    18 spring.kafka.consumer.auto-commit-interval=100
    19 # 指定消息key和消息体的编解码方式
    20 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

     运行效果图

      经过测试,通过java方式与console方式都是可以互相通信。

      假设在KafkaReceiver.java 第23行增加 Thread.sleep(10000); 用来模拟实际业务延时。类似抢票或者秒杀的应用场景。突然高峰,然后把所有订单数据都放到Kafka,然后在慢慢消费。生成实际业务订单,再通知用户付款。

      按照架构设计,平台部分会订阅EMQ的Topic,过滤部分数据或者完整数据报都发往Kafka,然后让Kafka后面的消费者根据需要自己进行消费。比如对所有 iot/product-uuid/device-uuid/device/+/property 所有属性日志相关的消息都通过Kafka的property这个Topic发送。消费者订阅property后,就可以消费。至于消费后可以存入influxdb进行持久化,也可以预处理后显示。MQTT设计中还有一类是 iot/product-uuid/device-uuid/device/+/event 事件类。这些相关的消息,会发往Kafka的event主题。然后由后面的消费者来消费event。进而进行报警等处理。


    参考资料:
      http://kafka.apachecn.org/documentation.html#operations
      https://hub.docker.com/r/wurstmeister/kafka
      https://mp.weixin.qq.com/s?__biz=MzU2NDg0OTgyMA==&mid=2247484570&idx=1&sn=1ad1c96bc7d47b88e976cbd045baf7d7

    本文地址:https://www.cnblogs.com/wunaozai/p/12358247.html
    本系列目录: https://www.cnblogs.com/wunaozai/p/8067577.html
    个人主页:https://www.wunaozai.com/

  • 相关阅读:
    cocoapods 命令
    开发常用
    ios 定位
    LoadingView
    自定义cell右侧 多按钮
    cocoaPods
    AFNetWorking
    iphone自定义铃声
    升级为iOS9后,默认请求类型为https,如何使用http进行请求会报错(引用他人的)
    理解c语言中的指针
  • 原文地址:https://www.cnblogs.com/wunaozai/p/12358247.html
Copyright © 2011-2022 走看看