背景
MQTT已然成为物联网的基础架构,在智慧园区、智慧城市、智慧楼宇、智慧交通等领域起着重要的作用,大规模的应用也暴露出了MQTTv3.1.1的一些缺陷,诸如session不会过期,只能等待客户端连接时设置clean session,如果客户端最后离线后,未进行上述操作,那么他的session将会保留在服务端;还有服务端主动断开客户端连接时,不能告知客户端原因,这时客户端无法知悉具体的断开原因,从而无法针对不同的事件进行有差别的处理;再有服务端不限制客户端发布的Qos1、2的数量,这时某个客户端可以发送大量的Qos1、2的消息来使得服务端的内存暴增。类似的缺陷和限制使得人们将目光转向新版本的MQTT协议,即MQTT5.0。
MQTT5.0协议发布以来,已经有非常多的实现。然而他们中的大多数都未提供面向k8s的部署方式,这使得在云原生环境下人们不得不自己来封装所使用的MQTT broker。本文旨在帮助读者以更快的速度来构建他们自己的MQTT集群。
k8s环境
在开始我们的实验之前,我们需要一个k8s,如果您还没有使用过k8s,我推荐您使用各个云服务提供商提供的在线k8s服务,如果您希望在本地进行调试,我推荐您使用kubesphere来一键安装。下面我们假设您的k8s已经搭建完成。
gmqtt 和 SailMQ
这里我们选择基于gmqtt开发的SailMQ作为我们的MQTT实现,gmqtt是用Go语言实现的一个具备灵活扩展能力,高性能的MQTT broker,其完整实现了MQTT V3.1.1和V5协议。笔者在此基础之上开发了SailMQ,SailMQ提供了面向智慧园区、智慧楼宇等领域的系统模型,定义了包括设备、设备管理员、App、App管理员和主题通道等慨念,实现了特有的访问控制子系统和规则引擎子系统等。
封装我们的helm chart
首先,我们需要封装配置文件为configMap,如下:
kind: ConfigMap
apiVersion: v1
metadata:
name: {{ include "sailmq.fullname" . }}
data:
sailmq.yml: >+
activation_code:
{{ .Values.activationCode }}
email: {{ .Values.email }}
phone: {{ .Values.phone }}
postgresql: {{ .Values.postgresql }}
redis:
address: {{ .Values.redis.address }}
password: {{ .Values.redis.password }}
db: {{ .Values.redis.db }}
# Path to pid file.
# If not set, there will be no pid file.
# pid_file: /var/run/gmqttd.pid
listeners:
# bind address
- address: ":1883"
# tls:
# cacert: "path_to_ca_cert_file"
# cert: "path_to_cert_file"
# key: "path_to_key_file"
- address: ":8883"
# websocket setting
websocket:
path: "/"
api:
grpc:
# The gRPC server listen address. Supports unix socket and tcp socket.
- address: "tcp://127.0.0.1:8084"
#- address: "unix:///var/run/gmqttd.sock"
# tls:
# cacert: "path_to_ca_cert_file"
# cert: "path_to_cert_file"
# key: "path_to_key_file"
http:
# The HTTP server listen address. This is a reverse-proxy server in front of gRPC server.
- address: "tcp://0.0.0.0:8083"
map: "tcp://127.0.0.1:8084" # The backend gRPC server endpoint,
# tls:
# cacert: "path_to_ca_cert_file"
# cert: "path_to_cert_file"
# key: "path_to_key_file"
mqtt:
session_expiry: 2h
session_expiry_check_timer: 20s
message_expiry: 2h
max_packet_size: 268435456
server_receive_maximum: 100
max_keepalive: 60
topic_alias_maximum: 10
subscription_identifier_available: true
wildcard_subscription_available: true
shared_subscription_available: true
maximum_qos: 2
retain_available: true
max_queued_messages: 1000
max_inflight: 100
queue_qos0_messages: true
delivery_mode: onlyonce # overlap or onlyonce
allow_zero_length_clientid: true
persistence:
type: memory # memory | redis
# The redis configuration only take effect when type == redis.
redis:
# redis server address
addr: "127.0.0.1:6379"
# the maximum number of idle connections in the redis connection pool.
max_idle: 1000
# the maximum number of connections allocated by the redis connection pool at a given time.
# If zero, there is no limit on the number of connections in the pool.
max_active: 0
# the connection idle timeout, connection will be closed after remaining idle for this duration. If the value is zero, then idle connections are not closed.
idle_timeout: 240s
password: ""
# the number of the redis database.
database: 0
# The topic alias manager setting. The topic alias feature is introduced by
# MQTT V5.
# This setting is used to control how the broker manage topic alias.
topic_alias_manager:
# Currently, only FIFO strategy is supported.
type: fifo
plugins:
prometheus:
path: "/metrics"
listen_address: ":8082"
auth:
# Password hash type. (plain | md5 | sha256 | bcrypt)
# Default to MD5.
hash: md5
# The file to store password. If it is a relative path, it locates in the same directory as the config file.
# (e.g: ./gmqtt_password => /etc/gmqtt/gmqtt_password.yml)
# Defaults to ./gmqtt_password.yml
# password_file:
federation:
# node_name is the unique identifier for the node in the federation. Defaults to hostname.
# node_name:
# fed_addr is the gRPC server listening address for the federation internal communication. Defaults to :8901
fed_addr: :8901
# advertise_fed_addr is used to change the federation gRPC server address that we advertise to other nodes in the cluster.
# Defaults to "fed_addr".However, in some cases, there may be a routable address that cannot be bound.
# If the port is missing, the default federation port (8901) will be used.
advertise_fed_addr: :8901
# gossip_addr is the address that the gossip will listen on, It is used for both UDP and TCP gossip. Defaults to :8902
gossip_addr: :8902
# advertise_gossip_addr is used to change the gossip server address that we advertise to other nodes in the cluster.
# Defaults to "GossipAddr" or the private IP address of the node if the IP in "GossipAddr" is 0.0.0.0.
# If the port is missing, the default gossip port (8902) will be used.
advertise_gossip_addr: :8902
# retry_join is the address of other nodes to join upon starting up.
# If port is missing, the default gossip port (8902) will be used.
#retry_join:
# - 127.0.0.1:8902
# rejoin_after_leave will be pass to "RejoinAfterLeave" in serf configuration.
# It controls our interaction with the snapshot file.
# When set to false (default), a leave causes a Serf to not rejoin the cluster until an explicit join is received.
# If this is set to true, we ignore the leave, and rejoin the cluster on start.
rejoin_after_leave: true
# snapshot_path will be pass to "SnapshotPath" in serf configuration.
# When Serf is started with a snapshot,it will attempt to join all the previously known nodes until one
# succeeds and will also avoid replaying old user events.
snapshot_path:
# plugin loading orders
plugin_order:
# Uncomment auth to enable authentication.
# - auth
# - prometheus
- admin
- federation
log:
level: info # debug | info | warn | error
format: text # json | text
# whether to dump MQTT packet in debug level
dump_packet: false
这里有一点需要特别注意,就是plugins -> federation -> retry_join这个配置,这是用于集群节点间互联的关键配置,我们会在StatefulSet中设置环境变量SAILMQ_FEDERATION_SERVERS,然后用下面的脚本将配置合并起来,脚本如下:
#!/usr/bin/env sh
set -eux
set -o pipefail
sailmq mergeconfig -o sailmq2.yml
sailmq start -c sailmq2.yml
然后需要配置service和headless service, 其中service 对外提供服务,而headless service用于集群间节点通信,配置如下:
apiVersion: v1
kind: Service
metadata:
name: {{ include "sailmq.fullname" . }}
labels:
{{- include "sailmq.labels" . | nindent 4 }}
spec:
type: {{ .Values.service.type }}
ports:
- port: 1883
targetPort: 1883
protocol: TCP
name: mqtt
- port: 8883
targetPort: 8883
protocol: TCP
name: ws
- port: 8083
targetPort: 8083
protocol: TCP
name: api
- port: 8082
targetPort: 8082
protocol: TCP
name: metric
selector:
{{- include "sailmq.selectorLabels" . | nindent 4 }}
---
apiVersion: v1
kind: Service
metadata:
name: {{ include "sailmq.fullname" . }}-headless
labels:
{{- include "sailmq.labels" . | nindent 4 }}
spec:
type: ClusterIP
clusterIP: None
ports:
- port: 1883
targetPort: 1883
protocol: TCP
name: mqtt
- port: 8883
targetPort: 8883
protocol: TCP
name: ws
- port: 8083
targetPort: 8083
protocol: TCP
name: api
- port: 8082
targetPort: 8082
protocol: TCP
name: metric
- port: 8901
targetPort: 8901
protocol: TCP
name: fed
- port: 8902
targetPort: 8902
protocol: TCP
name: gossip
selector:
{{- include "sailmq.selectorLabels" . | nindent 4 }}
最后配置StatefulSet,如下:
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: {{ include "sailmq.fullname" . }}
labels:
{{- include "sailmq.labels" . | nindent 4 }}
spec:
replicas: {{ .Values.replicaCount }}
serviceName: {{ include "sailmq.fullname" . }}-headless
updateStrategy:
type: RollingUpdate
selector:
matchLabels:
{{- include "sailmq.selectorLabels" . | nindent 6 }}
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "sailmq.selectorLabels" . | nindent 8 }}
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "sailmq.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
containers:
- name: {{ .Chart.Name }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "hevienz/sailmq:latest"
imagePullPolicy: {{ .Values.image.pullPolicy }}
command: ["run.sh"]
env:
- name: SAILMQ_FEDERATION_SERVERS
{{- $replicaCount := int .Values.replicaCount }}
{{- $sailmqFullname := include "sailmq.fullname" . }}
{{- $sailmqHeadlessServiceName := printf "%s-%s" $sailmqFullname "headless" | trunc 63 }}
value: {{ range $i, $e := until $replicaCount }}{{ $sailmqFullname }}-{{ $e }}.{{ $sailmqHeadlessServiceName }}:8902 {{ end }}
ports:
- name: mqtt
containerPort: 1883
protocol: TCP
- name: ws
containerPort: 8883
protocol: TCP
- name: api
containerPort: 8083
protocol: TCP
- name: metric
containerPort: 8082
protocol: TCP
- name: fed
containerPort: 8901
protocol: TCP
- name: gossip
containerPort: 8902
protocol: TCP
volumeMounts:
- name: sailmqv
mountPath: /etc/sailmq
readOnly: true
livenessProbe:
tcpSocket:
port: 1883
readinessProbe:
tcpSocket:
port: 1883
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumes:
- name: sailmqv
configMap:
name: {{ include "sailmq.fullname" . }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}
请特别注意上文中提到的环境变量 SAILMQ_FEDERATION_SERVERS,她会生成基于headless service的集群节点列表,这里特别将其提取出来:
env:
- name: SAILMQ_FEDERATION_SERVERS
{{- $replicaCount := int .Values.replicaCount }}
{{- $sailmqFullname := include "sailmq.fullname" . }}
{{- $sailmqHeadlessServiceName := printf "%s-%s" $sailmqFullname "headless" | trunc 63 }}
value: {{ range $i, $e := until $replicaCount }}{{ $sailmqFullname }}-{{ $e }}.{{ $sailmqHeadlessServiceName }}:8902 {{ end }}
总结
将MQTT集群封装成helm chart,极大地提高了运维人员的效率,是应用部署的推荐方式。SailMQ是一个MQTT服务端的实现,基于gmqtt提供的全面的MQTT v3.1.1和MQTT v5支持,为用户提供面向未来的智慧园区基础设施。