zoukankan      html  css  js  c++  java
  • 将你的MQTT集群封装成helm chart

    背景

    MQTT已然成为物联网的基础架构,在智慧园区、智慧城市、智慧楼宇、智慧交通等领域起着重要的作用,大规模的应用也暴露出了MQTTv3.1.1的一些缺陷,诸如session不会过期,只能等待客户端连接时设置clean session,如果客户端最后离线后,未进行上述操作,那么他的session将会保留在服务端;还有服务端主动断开客户端连接时,不能告知客户端原因,这时客户端无法知悉具体的断开原因,从而无法针对不同的事件进行有差别的处理;再有服务端不限制客户端发布的Qos1、2的数量,这时某个客户端可以发送大量的Qos1、2的消息来使得服务端的内存暴增。类似的缺陷和限制使得人们将目光转向新版本的MQTT协议,即MQTT5.0。

    MQTT5.0协议发布以来,已经有非常多的实现。然而他们中的大多数都未提供面向k8s的部署方式,这使得在云原生环境下人们不得不自己来封装所使用的MQTT broker。本文旨在帮助读者以更快的速度来构建他们自己的MQTT集群。

    k8s环境

    在开始我们的实验之前,我们需要一个k8s,如果您还没有使用过k8s,我推荐您使用各个云服务提供商提供的在线k8s服务,如果您希望在本地进行调试,我推荐您使用kubesphere来一键安装。下面我们假设您的k8s已经搭建完成。

    gmqtt 和 SailMQ

    这里我们选择基于gmqtt开发的SailMQ作为我们的MQTT实现,gmqtt是用Go语言实现的一个具备灵活扩展能力,高性能的MQTT broker,其完整实现了MQTT V3.1.1和V5协议。笔者在此基础之上开发了SailMQ,SailMQ提供了面向智慧园区、智慧楼宇等领域的系统模型,定义了包括设备、设备管理员、App、App管理员和主题通道等慨念,实现了特有的访问控制子系统和规则引擎子系统等。

    封装我们的helm chart

    首先,我们需要封装配置文件为configMap,如下:

    kind: ConfigMap
    apiVersion: v1
    metadata:
      name: {{ include "sailmq.fullname" . }}
    data:
      sailmq.yml: >+
        activation_code:
        {{ .Values.activationCode }}
    
        email: {{ .Values.email }} 
    
        phone: {{ .Values.phone }}
    
    
        postgresql: {{ .Values.postgresql }}
    
        redis:
          address: {{ .Values.redis.address }}
          password: {{ .Values.redis.password }}
          db: {{ .Values.redis.db }}
    
    
        # Path to pid file.
    
        # If not set, there will be no pid file.
    
        # pid_file: /var/run/gmqttd.pid
    
    
        listeners:
          # bind address
          - address: ":1883"
        #    tls:
    
        #      cacert: "path_to_ca_cert_file"
    
        #      cert: "path_to_cert_file"
    
        #      key: "path_to_key_file"
    
          - address: ":8883"
            # websocket setting
            websocket:
              path: "/"
    
        api:
          grpc:
            # The gRPC server listen address. Supports unix socket and tcp socket.
            - address: "tcp://127.0.0.1:8084"
            #- address: "unix:///var/run/gmqttd.sock"
        #      tls:
    
        #        cacert: "path_to_ca_cert_file"
    
        #        cert: "path_to_cert_file"
    
        #        key: "path_to_key_file"
          http:
              # The HTTP server listen address. This is a reverse-proxy server in front of gRPC server.
            - address: "tcp://0.0.0.0:8083"
              map: "tcp://127.0.0.1:8084" # The backend gRPC server endpoint,
        #      tls:
    
        #        cacert: "path_to_ca_cert_file"
    
        #        cert: "path_to_cert_file"
    
        #        key: "path_to_key_file"
    
    
        mqtt:
          session_expiry: 2h
          session_expiry_check_timer: 20s
          message_expiry: 2h
          max_packet_size: 268435456
          server_receive_maximum: 100
          max_keepalive: 60
          topic_alias_maximum: 10
          subscription_identifier_available: true
          wildcard_subscription_available: true
          shared_subscription_available: true
          maximum_qos: 2
          retain_available: true
          max_queued_messages: 1000
          max_inflight: 100
          queue_qos0_messages: true
          delivery_mode: onlyonce # overlap or onlyonce
          allow_zero_length_clientid: true
    
        persistence:
          type: memory  # memory | redis
          # The redis configuration only take effect when type == redis.
          redis:
            # redis server address
            addr: "127.0.0.1:6379"
            # the maximum number of idle connections in the redis connection pool.
            max_idle: 1000
            # the maximum number of connections allocated by the redis connection pool at a given time.
            # If zero, there is no limit on the number of connections in the pool.
            max_active: 0
            # the connection idle timeout, connection will be closed after remaining idle for this duration. If the value is zero, then idle connections are not closed.
            idle_timeout: 240s
            password: ""
            # the number of the redis database.
            database: 0
    
        # The topic alias manager setting. The topic alias feature is introduced by
        # MQTT V5.
    
        # This setting is used to control how the broker manage topic alias.
    
        topic_alias_manager:
          # Currently, only FIFO strategy is supported.
          type: fifo
    
        plugins:
          prometheus:
            path: "/metrics"
            listen_address: ":8082"
          auth:
            # Password hash type. (plain | md5 | sha256 | bcrypt)
            # Default to MD5.
            hash: md5
            # The file to store password. If it is a relative path, it locates in the same directory as the config file.
            # (e.g: ./gmqtt_password => /etc/gmqtt/gmqtt_password.yml)
            # Defaults to ./gmqtt_password.yml
            # password_file:
          federation:
            # node_name is the unique identifier for the node in the federation. Defaults to hostname.
            # node_name:
            # fed_addr is the gRPC server listening address for the federation internal communication. Defaults to :8901
            fed_addr: :8901
            # advertise_fed_addr is used to change the federation gRPC server address that we advertise to other nodes in the cluster.
            # Defaults to "fed_addr".However, in some cases, there may be a routable address that cannot be bound.
            # If the port is missing, the default federation port (8901) will be used.
            advertise_fed_addr: :8901
            # gossip_addr is the address that the gossip will listen on, It is used for both UDP and TCP gossip. Defaults to :8902
            gossip_addr: :8902
            # advertise_gossip_addr is used to change the gossip server address that we advertise to other nodes in the cluster.
            # Defaults to "GossipAddr" or the private IP address of the node if the IP in "GossipAddr" is 0.0.0.0.
            # If the port is missing, the default gossip port (8902) will be used.
            advertise_gossip_addr: :8902
    
            # retry_join is the address of other nodes to join upon starting up.
            # If port is missing, the default gossip port (8902) will be used.
            #retry_join:
            #  - 127.0.0.1:8902
    
            # rejoin_after_leave will be pass to "RejoinAfterLeave" in serf configuration.
            # It controls our interaction with the snapshot file.
            # When set to false (default), a leave causes a Serf to not rejoin the cluster until an explicit join is received.
            # If this is set to true, we ignore the leave, and rejoin the cluster on start.
            rejoin_after_leave: true
            # snapshot_path will be pass to "SnapshotPath" in serf configuration.
            # When Serf is started with a snapshot,it will attempt to join all the previously known nodes until one
            # succeeds and will also avoid replaying old user events.
            snapshot_path:
    
        # plugin loading orders
    
        plugin_order:
          # Uncomment auth to enable authentication.
          # - auth
          # - prometheus
          - admin
          - federation
        log:
          level: info # debug | info | warn | error
          format: text # json | text
          # whether to dump MQTT packet in debug level
          dump_packet: false
    
    
    
    

    这里有一点需要特别注意,就是plugins -> federation -> retry_join这个配置,这是用于集群节点间互联的关键配置,我们会在StatefulSet中设置环境变量SAILMQ_FEDERATION_SERVERS,然后用下面的脚本将配置合并起来,脚本如下:

    #!/usr/bin/env sh
    
    set -eux
    set -o pipefail
    
    sailmq mergeconfig -o sailmq2.yml
    
    sailmq start -c sailmq2.yml
    
    

    然后需要配置service和headless service, 其中service 对外提供服务,而headless service用于集群间节点通信,配置如下:

    apiVersion: v1
    kind: Service
    metadata:
      name: {{ include "sailmq.fullname" . }}
      labels:
        {{- include "sailmq.labels" . | nindent 4 }}
    spec:
      type: {{ .Values.service.type }}
      ports:
        - port: 1883
          targetPort: 1883
          protocol: TCP
          name: mqtt
        - port: 8883
          targetPort: 8883
          protocol: TCP
          name: ws
        - port: 8083
          targetPort: 8083
          protocol: TCP
          name: api
        - port: 8082
          targetPort: 8082
          protocol: TCP
          name: metric
      selector:
        {{- include "sailmq.selectorLabels" . | nindent 4 }}
    
    ---
    
    apiVersion: v1
    kind: Service
    metadata:
      name: {{ include "sailmq.fullname" . }}-headless
      labels:
        {{- include "sailmq.labels" . | nindent 4 }}
    spec:
      type: ClusterIP
      clusterIP: None
      ports:
        - port: 1883
          targetPort: 1883
          protocol: TCP
          name: mqtt
        - port: 8883
          targetPort: 8883
          protocol: TCP
          name: ws
        - port: 8083
          targetPort: 8083
          protocol: TCP
          name: api
        - port: 8082
          targetPort: 8082
          protocol: TCP
          name: metric
        - port: 8901
          targetPort: 8901
          protocol: TCP
          name: fed
        - port: 8902
          targetPort: 8902
          protocol: TCP
          name: gossip
      selector:
        {{- include "sailmq.selectorLabels" . | nindent 4 }}
    
    

    最后配置StatefulSet,如下:

    apiVersion: apps/v1
    kind: StatefulSet
    metadata:
      name: {{ include "sailmq.fullname" . }}
      labels:
        {{- include "sailmq.labels" . | nindent 4 }}
    spec:
      replicas: {{ .Values.replicaCount }}
      serviceName: {{ include "sailmq.fullname" . }}-headless 
      updateStrategy:
        type: RollingUpdate
      selector:
        matchLabels:
          {{- include "sailmq.selectorLabels" . | nindent 6 }}
      template:
        metadata:
        {{- with .Values.podAnnotations }}
          annotations:
            {{- toYaml . | nindent 8 }}
        {{- end }}
          labels:
            {{- include "sailmq.selectorLabels" . | nindent 8 }}
        spec:
          {{- with .Values.imagePullSecrets }}
          imagePullSecrets:
            {{- toYaml . | nindent 8 }}
          {{- end }}
          serviceAccountName: {{ include "sailmq.serviceAccountName" . }}
          securityContext:
            {{- toYaml .Values.podSecurityContext | nindent 8 }}
          containers:
            - name: {{ .Chart.Name }}
              securityContext:
                {{- toYaml .Values.securityContext | nindent 12 }}
              image: "hevienz/sailmq:latest"
              imagePullPolicy: {{ .Values.image.pullPolicy }}
              command: ["run.sh"]
              env:
                - name: SAILMQ_FEDERATION_SERVERS
                  {{- $replicaCount := int .Values.replicaCount }}
                  {{- $sailmqFullname := include "sailmq.fullname" . }}
                  {{- $sailmqHeadlessServiceName := printf "%s-%s" $sailmqFullname "headless" | trunc 63  }}
                  value: {{ range $i, $e := until $replicaCount }}{{ $sailmqFullname }}-{{ $e }}.{{ $sailmqHeadlessServiceName }}:8902 {{ end }}
              ports:
                - name: mqtt
                  containerPort: 1883
                  protocol: TCP
                - name: ws
                  containerPort: 8883
                  protocol: TCP
                - name: api
                  containerPort: 8083
                  protocol: TCP
                - name: metric
                  containerPort: 8082
                  protocol: TCP
                - name: fed
                  containerPort: 8901
                  protocol: TCP
                - name: gossip
                  containerPort: 8902
                  protocol: TCP
              volumeMounts:
              - name: sailmqv
                mountPath: /etc/sailmq
                readOnly: true
              livenessProbe:
                tcpSocket:
                  port: 1883
              readinessProbe:
                tcpSocket:
                  port: 1883
              resources:
                {{- toYaml .Values.resources | nindent 12 }}
          volumes:
            - name: sailmqv
              configMap:
                name: {{ include "sailmq.fullname" . }}
          {{- with .Values.nodeSelector }}
          nodeSelector:
            {{- toYaml . | nindent 8 }}
          {{- end }}
          {{- with .Values.affinity }}
          affinity:
            {{- toYaml . | nindent 8 }}
          {{- end }}
          {{- with .Values.tolerations }}
          tolerations:
            {{- toYaml . | nindent 8 }}
          {{- end }}
    

    请特别注意上文中提到的环境变量 SAILMQ_FEDERATION_SERVERS,她会生成基于headless service的集群节点列表,这里特别将其提取出来:

              env:
                - name: SAILMQ_FEDERATION_SERVERS
                  {{- $replicaCount := int .Values.replicaCount }}
                  {{- $sailmqFullname := include "sailmq.fullname" . }}
                  {{- $sailmqHeadlessServiceName := printf "%s-%s" $sailmqFullname "headless" | trunc 63  }}
                  value: {{ range $i, $e := until $replicaCount }}{{ $sailmqFullname }}-{{ $e }}.{{ $sailmqHeadlessServiceName }}:8902 {{ end }}
    

    总结

    将MQTT集群封装成helm chart,极大地提高了运维人员的效率,是应用部署的推荐方式。SailMQ是一个MQTT服务端的实现,基于gmqtt提供的全面的MQTT v3.1.1和MQTT v5支持,为用户提供面向未来的智慧园区基础设施。

    作者:Hevienz
    出处:http://www.cnblogs.com/hymenz/
    知识共享许可协议
    本博客原创作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。
  • 相关阅读:
    操作数据库帮助类
    VS快捷键收藏
    sqlserver 定时任务
    LayUI相关
    java20140407
    java20140406
    java20140405
    获取一个字符串在整个字符串中出现的次数
    System类
    java中的集合Collection
  • 原文地址:https://www.cnblogs.com/hymenz/p/14686603.html
Copyright © 2011-2022 走看看