zoukankan      html  css  js  c++  java
  • 部署EMQX集群

    一 . 环境说明

    操作系统 : CentOS7

    集群节点 : 

    172.18.0.231 集群节点1
    172.18.0.232 集群节点2
    172.18.0.233 集群节点3

    EMQ X Broker版本 : 4.1.0

    集群发现策略 : etcd

    认证方式 : PostgreSQL方式 

    密码加密 : SHA-265

    SSL双向认证 : 启用

    二 . 准备工作

    准备etcd集群 , 可参考 : https://www.cnblogs.com/kreo/p/13155893.html

    准备SSL证书 , 可参考 : https://www.cnblogs.com/kreo/p/13203973.html

    三 . 下载EMQX

    官网下载 : https://www.emqx.io/cn/products/broker

    百度网盘下载 : 

    链接:https://pan.baidu.com/s/1mvs4M76cWeQh7VmkQXx9EA 
    提取码:zfb5

    四 . 解压安装包

    注意 : 本文都采用root用户进行安装 , 如果需要切换其他用户 , 也可以正常运行(注意权限问题)

    #解压
    unzip emqx-centos7-v4.1.0.zip
    #可以移到任何你想移动到的目录 , 本例子是移动到了/home下 , 即安装目录为 /home/emqx
    mv emqx /home/

    PS : ***本文都是使用 /home/emqx 作为 ${emqx_home} 进行处理 ,如需改变 , 则需要修改相应的配置文件***

    五. 修改主配置文件

    主配置文件路径 : /home/emqx/etc/emqx.conf

    主要修改下面几部分 : 

    1 . 节点名称(node.name)

    2 . PostgreSQL认证

    3 . SSL认证

    4 . etcd认证开启

    下面是我配置文件全文 , 可直接拿过去使用 , 需要修改部分我标了红色

    cluster.name = wanmaemqxcl
    cluster.proto_dist = inet_tcp
    cluster.discovery = etcd
    cluster.autoheal = on
    cluster.autoclean = 5m
    #其他节点的server也可能要改 cluster.etcd.server
    = http://127.0.0.1:2379 cluster.etcd.prefix = emqxcl cluster.etcd.node_ttl = 1m
    #其他节点的node.name要改 node.name
    = emqx@172.18.0.231 node.cookie = emqxsecretcookie node.data_dir = data node.global_gc_interval = 15m node.crash_dump = log/crash.dump node.dist_listen_min = 6369 node.dist_listen_max = 6369 rpc.mode = async rpc.async_batch_size = 256 rpc.tcp_server_port = 5369 rpc.tcp_client_port = 5369 rpc.connect_timeout = 5s rpc.send_timeout = 5s rpc.authentication_timeout = 5s rpc.call_receive_timeout = 15s rpc.socket_keepalive_idle = 900s rpc.socket_keepalive_interval = 75s rpc.socket_keepalive_count = 9 rpc.socket_sndbuf = 1MB rpc.socket_recbuf = 1MB rpc.socket_buffer = 1MB log.to = both log.level = notice log.dir = log log.file = emqx.log #log.chars_limit = 8192 log.rotation = on log.rotation.size = 10MB log.rotation.count = 5 #log.info.file = info.log log.error.file = error.log #log.sync_mode_qlen = 100 #log.drop_mode_qlen = 3000 #log.flush_qlen = 8000 #log.overload_kill = on #log.overload_kill_qlen = 20000 #log.overload_kill_mem_size = 30MB #log.overload_kill_restart_after = 5s #log.burst_limit = 20000, 1s allow_anonymous = false acl_nomatch = allow acl_file = etc/acl.conf enable_acl_cache = on acl_cache_max_size = 32 acl_cache_ttl = 1m acl_deny_action = ignore flapping_detect_policy = 30, 1m, 5m mqtt.max_packet_size = 1MB mqtt.max_clientid_len = 65535 mqtt.max_topic_levels = 0 mqtt.max_qos_allowed = 2 mqtt.max_topic_alias = 65535 mqtt.retain_available = true mqtt.wildcard_subscription = true mqtt.shared_subscription = true mqtt.ignore_loop_deliver = false mqtt.strict_mode = false zone.external.idle_timeout = 15s zone.external.enable_acl = on zone.external.enable_ban = on zone.external.enable_stats = on zone.external.acl_deny_action = ignore
    #可根据实际情况修改 zone.external.force_gc_policy
    = 32000|32MB zone.external.keepalive_backoff = 0.75 zone.external.max_subscriptions = 0 zone.external.upgrade_qos = off zone.external.max_inflight = 32 zone.external.retry_interval = 30s zone.external.max_awaiting_rel = 100 zone.external.await_rel_timeout = 300s zone.external.session_expiry_interval = 2h zone.external.max_mqueue_len = 1000 zone.external.mqueue_priorities = none zone.external.mqueue_default_priority = highest zone.external.mqueue_store_qos0 = true zone.external.enable_flapping_detect = off zone.external.use_username_as_clientid = false zone.external.ignore_loop_deliver = false zone.external.strict_mode = false zone.internal.allow_anonymous = true zone.internal.enable_stats = on zone.internal.enable_acl = off zone.internal.acl_deny_action = ignore zone.internal.max_subscriptions = 0 zone.internal.max_inflight = 128 zone.internal.max_awaiting_rel = 1000 zone.internal.max_mqueue_len = 10000 zone.internal.mqueue_store_qos0 = true zone.internal.enable_flapping_detect = off zone.internal.ignore_loop_deliver = false zone.internal.strict_mode = false zone.internal.bypass_auth_plugins = true listener.tcp.external = 0.0.0.0:1883 listener.tcp.external.acceptors = 8 listener.tcp.external.max_connections = 1024000 listener.tcp.external.max_conn_rate = 1000 listener.tcp.external.active_n = 100 listener.tcp.external.zone = external listener.tcp.external.access.1 = allow all listener.tcp.external.backlog = 1024 listener.tcp.external.send_timeout = 15s listener.tcp.external.send_timeout_close = on listener.tcp.external.nodelay = true listener.tcp.external.reuseaddr = true listener.tcp.internal = 127.0.0.1:11883 listener.tcp.internal.acceptors = 4 listener.tcp.internal.max_connections = 1024000 listener.tcp.internal.max_conn_rate = 1000 listener.tcp.internal.active_n = 1000 listener.tcp.internal.zone = internal listener.tcp.internal.backlog = 512 listener.tcp.internal.send_timeout = 5s listener.tcp.internal.send_timeout_close = on listener.tcp.internal.recbuf = 64KB listener.tcp.internal.sndbuf = 64KB listener.tcp.internal.nodelay = false listener.tcp.internal.reuseaddr = true listener.ssl.external = 8883 listener.ssl.external.acceptors = 16 listener.ssl.external.max_connections = 102400 listener.ssl.external.max_conn_rate = 500 listener.ssl.external.active_n = 100 listener.ssl.external.zone = external listener.ssl.external.access.1 = allow all listener.ssl.external.tls_versions = tlsv1.3,tlsv1.2,tlsv1.1,tlsv1 listener.ssl.external.handshake_timeout = 15s
    #证书文件要放到/home/emqx/etc/certs目录下 listener.ssl.external.keyfile
    = etc/certs/server.key listener.ssl.external.certfile = etc/certs/server.pem listener.ssl.external.cacertfile = etc/certs/ca.pem listener.ssl.external.verify = verify_peer listener.ssl.external.fail_if_no_peer_cert = true listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA #listener.ssl.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA listener.ssl.external.reuseaddr = true listener.ws.external = 8083 listener.ws.external.mqtt_path = /mqtt listener.ws.external.acceptors = 4 listener.ws.external.max_connections = 102400 listener.ws.external.max_conn_rate = 1000 listener.ws.external.active_n = 100 listener.ws.external.zone = external listener.ws.external.access.1 = allow all listener.ws.external.verify_protocol_header = on listener.ws.external.backlog = 1024 listener.ws.external.send_timeout = 15s listener.ws.external.send_timeout_close = on listener.ws.external.nodelay = true listener.wss.external = 8084 listener.wss.external.mqtt_path = /mqtt listener.wss.external.acceptors = 4 listener.wss.external.max_connections = 16 listener.wss.external.max_conn_rate = 1000 listener.wss.external.active_n = 100 listener.wss.external.zone = external listener.wss.external.access.1 = allow all listener.wss.external.verify_protocol_header = on listener.wss.external.keyfile = etc/certs/server.key listener.wss.external.certfile = etc/certs/server.pem listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA listener.wss.external.backlog = 1024 listener.wss.external.send_timeout = 15s listener.wss.external.send_timeout_close = on modules.loaded_file = data/loaded_modules module.presence.qos = 1 plugins.etc_dir = etc/plugins/ plugins.loaded_file = data/loaded_plugins plugins.expand_plugins_dir = plugins/ broker.sys_interval = 1m broker.sys_heartbeat = 30s broker.session_locking_strategy = quorum broker.shared_subscription_strategy = random broker.shared_dispatch_ack_enabled = false broker.route_batch_clean = off sysmon.long_gc = 0 sysmon.long_schedule = 240ms sysmon.large_heap = 8MB sysmon.busy_port = false sysmon.busy_dist_port = true os_mon.cpu_check_interval = 60s os_mon.cpu_high_watermark = 80% os_mon.cpu_low_watermark = 60% os_mon.mem_check_interval = 60s os_mon.sysmem_high_watermark = 70% os_mon.procmem_high_watermark = 5% vm_mon.check_interval = 30s vm_mon.process_high_watermark = 80% vm_mon.process_low_watermark = 60%

    六 . 开启PostgreSQL认证

    1 . 新建PostgreSQL数据库 , 并新建表(为了和业务结合, 我改了官方的表名和部分结构)

    新建数据库 : mqttconfig
    用户名 : mqtt
    密码 : 123456

    新建用户表

    CREATE TABLE "public"."ct_sys_emqx_user" (
      "id" int4 NOT NULL,
      "is_superuser" bool NOT NULL,
      "username" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,
      "password" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
      "salt" varchar(255) COLLATE "pg_catalog"."default",
      "enable" int2 NOT NULL DEFAULT 1,
      "add_user" varchar(64) COLLATE "pg_catalog"."default",
      "add_time" timestamp(6),
      "lm_user" varchar(64) COLLATE "pg_catalog"."default",
      "lm_time" timestamp(6),
      CONSTRAINT "ct_sys_emqx_user_pkey" PRIMARY KEY ("id")
    );
    COMMENT ON COLUMN "public"."ct_sys_emqx_user"."id" IS '主键ID';
    COMMENT ON COLUMN "public"."ct_sys_emqx_user"."is_superuser" IS '是否超级用户';
    COMMENT ON COLUMN "public"."ct_sys_emqx_user"."username" IS '用户名';
    COMMENT ON COLUMN "public"."ct_sys_emqx_user"."password" IS '密码';
    COMMENT ON COLUMN "public"."ct_sys_emqx_user"."salt" IS '盐值';
    COMMENT ON COLUMN "public"."ct_sys_emqx_user"."enable" IS '禁用0启用1';
    COMMENT ON COLUMN "public"."ct_sys_emqx_user"."add_user" IS '创建人';
    COMMENT ON COLUMN "public"."ct_sys_emqx_user"."add_time" IS '创建时间';
    COMMENT ON COLUMN "public"."ct_sys_emqx_user"."lm_user" IS '修改人';
    COMMENT ON COLUMN "public"."ct_sys_emqx_user"."lm_time" IS '修改时间';
    COMMENT ON TABLE "public"."ct_sys_emqx_user" IS 'EMQX用户表';

    新建ACL表(访问控制表)

    CREATE TABLE "public"."ct_sys_emqx_acl" (
      "id" int4 NOT NULL,
      "allow" int4 NOT NULL,
      "ipaddr" varchar(60) COLLATE "pg_catalog"."default",
      "username" varchar(100) COLLATE "pg_catalog"."default",
      "clientid" varchar(100) COLLATE "pg_catalog"."default",
      "access" int4,
      "topic" varchar(100) COLLATE "pg_catalog"."default",
      "level" int4 NOT NULL DEFAULT 100,
      "enable" int2 NOT NULL DEFAULT 1,
      "add_user" varchar(64) COLLATE "pg_catalog"."default",
      "add_time" timestamp(6),
      "lm_user" varchar(64) COLLATE "pg_catalog"."default",
      "lm_time" timestamp(6),
      CONSTRAINT "ct_sys_emqx_acl_pkey" PRIMARY KEY ("id")
    );
    COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."id" IS '主键ID';
    COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."allow" IS '禁止0允许1';
    COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."ipaddr" IS 'IP地址';
    COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."username" IS '用户名($all代表全部用户)';
    COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."clientid" IS '客户端';
    COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."access" IS '操作:订阅1发布2全部3';
    COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."topic" IS '控制的主题(允许占位符)';
    COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."level" IS '等级,越高越优先';
    COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."enable" IS '禁用0启用1';
    COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."add_user" IS '创建人';
    COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."add_time" IS '创建时间';
    COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."lm_user" IS '修改人';
    COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."lm_time" IS '修改时间';

    添加相应数据到用户表

    注意 : password填入的是SHA-256加密后的字符串 , SHA-256加密方法参考:

    public static String encryptSHA256(String content) {
        try {
            MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
            return Hex.encodeHexString(messageDigest.digest(content.getBytes("UTF-8")));
        } catch (UnsupportedEncodingException | NoSuchAlgorithmException e) {
            e.printStackTrace();
        }
        return null;
    }

    PS : 密码的Hex一定要用全小写字符 , 全大写的经测试无效 , 所以采用Hex.encodeHexString方法 , 该方法生成全小写的Hex字符串

    2 . 修改emqx_auth_pgsql插件配置文件

    配置文件路径 : /home/emqx/etc/plugins/emqx_auth_pgsql.conf

    auth.pgsql.server = 2.18.0.97:5432
    auth.pgsql.pool = 8
    auth.pgsql.username = mqtt
    auth.pgsql.password = 123456
    auth.pgsql.database = mqttconfig
    auth.pgsql.encoding = utf8
    auth.pgsql.ssl = false
    auth.pgsql.auth_query = select password from ct_sys_emqx_user where username = '%u' and enable = 1 limit 1
    auth.pgsql.password_hash = sha256
    auth.pgsql.super_query = select is_superuser from ct_sys_emqx_user where username = '%u' and enable = 1 limit 1
    auth.pgsql.acl_query = select allow, ipaddr, username, clientid, access, topic from ct_sys_emqx_acl where enable = 1 and (ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c') order by level desc

    3. 修改插件加载配置

    修改插件加载配置 : /home/emqx/data/loaded_plugins

    下面加上一行 : {emqx_auth_pgsql,true}.

    注意有个点 .

    完成配置文件如下 : (你如果已经加载了其他插件 , 则可能会不同)

    {emqx_management, true}.
    {emqx_recon, true}.
    {emqx_retainer, true}.
    {emqx_dashboard, true}.
    {emqx_rule_engine, true}.
    {emqx_bridge_mqtt, false}.
    {emqx_auth_pgsql , true}.

    也可能这样 : 

    emqx_management.
    emqx_recon.
    emqx_retainer.
    emqx_dashboard.
    emqx_rule_engine.
    emqx_auth_pgsql.

    两种配置效果一样.

    通过命令方式动态加载插件 : 

    # ** 需要在emqx启动后
    # 使用命令 加载插件
    /home/emqx/bin/emqx_ctl plugins load emqx_auth_pgsql

    七 . 启动/关闭 EMQX

    1 .  改变当前环境变量(.bash_profile) , 把/home/emqx/bin 加入PATH中 (非必须)

    2 . 启动关闭脚本

    #启动脚本
    /home/emqx/bin/emqx start
    
    #关闭脚本
    /home/emqx/bin/emqx stop
    
    #查看启动状态
    /home/emqx/bin/emqx_ctl status

    八 . 部署其他集群环境

    改变第四步中的 主配置文件中的 node.name 和 cluster.etcd.server(根据情况) , 其他全部按照 三 ~ 七 步骤进行部署

    九 . 其他常用命令

    # 重启EMQX
    emqx restart
    
    #///启动关闭插件 
    # 查看插件加载情况
    emqx_ctl plugins list  # Show loaded plugins
    # 加载插件
    emqx_ctl plugins load <Plugin> 
    # 取消加载插件
    emqx_ctl plugins unload <Plugin>
    # 重新加载插件
    emqx_ctl plugins reload <Plugin>

    十 . WEB管理端(DashBoard)

    随便打开集群某一台的18083端口即可访问

    http://172.18.0.231:18083

    默认用户名 : admin

    默认密码 : public (注意修改)

    里面可以查看一些集群监控信息

    注意 : 此DashBorad只作为监控使用 , 不要用作管理(比如加载插件) , 否则可能出现不可预计的后果

    十一 . nginx代理

    使用nginx代理即可实现集群的负载均衡

    首先nginx必须加载with-stream和with-stream_ssl_module模块

    具体nginx安装可参考 : https://www.cnblogs.com/kreo/p/4378086.html

    注意 : 下面配置需要放在 stream 块中

    log_format proxy '$remote_addr [$time_local] '
                     '$protocol $status $bytes_sent $bytes_received '
                     '$session_time "$upstream_addr" '
                     '"$upstream_bytes_sent" "$upstream_bytes_received" "$upstream_connect_time"';
    
    upstream emqx_servers {
        zone emqx_servers 64k;
        hash $remote_addr;
        server 172.18.0.231:1883 max_fails=2 fail_timeout=30s weight=100;
        server 172.18.0.232:1883 max_fails=2 fail_timeout=30s weight=100;
        server 172.18.0.233:1883 max_fails=2 fail_timeout=30s weight=100;
    }
    
    server {
        listen 8883 ssl;    
        proxy_pass emqx_servers;
        proxy_buffer_size 4k;
        ssl_handshake_timeout 15s;
    # 下面2个模块在nginx商业授权中 , 土豪可以打开
    # status_zone emqx_servers; # health_check;
    # 证书需要复制过来放入相应的目录中
    ssl_certificate /usr/local/nginx/conf/certs/emqx/client.pem; ssl_certificate_key /usr/local/nginx/conf/certs/emqx/client.key; access_log /u01/log/nginx/emqx.access.log proxy; error_log /u01/log/nginx/emqx.error.log; }

    假设nginx服务器所在的IP是 : 172.18.0.97

    那么 , 统一可以使用 ssl://172.18.0.97:8883 进行访问

    十二 . JAVA访问例子

    创建连接:

    package test.mqtt;
    
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.internal.security.SSLSocketFactoryFactory;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    import java.util.Properties;
    
    /**
     * @author kreo
     * @description
     * @date 2020-6-23 23:15:16
     */
    public class MqttConnection {
        private final static String broker = "ssl://172.18.0.97:8883";
        private final static String clientId = "LOCAL_JAVA_CLIENT";
        private final static MemoryPersistence persistence = new MemoryPersistence();
    
        private static MqttClient client;
    
        public static MqttClient getClient() {
            try {
                if (client == null) {
                    client = new MqttClient(broker, clientId, persistence);
    
                    // MQTT 连接选项
                    MqttConnectOptions connOptions = new MqttConnectOptions();
                    connOptions.setUserName("guest");
                    connOptions.setPassword("123456".toCharArray());
                    Properties sslProperties = new Properties();
                    sslProperties.put(SSLSocketFactoryFactory.KEYSTORE, "/usr/var/certs/client.jks");
                    sslProperties.put(SSLSocketFactoryFactory.KEYSTOREPWD, "client.wanmagroup.com");
                    sslProperties.put(SSLSocketFactoryFactory.KEYSTORETYPE, "JKS");
    
                    sslProperties.put(SSLSocketFactoryFactory.TRUSTSTORE, "/usr/var/certs/ca.jks");
                    sslProperties.put(SSLSocketFactoryFactory.TRUSTSTOREPWD, "wanmagroup.com");
                    sslProperties.put(SSLSocketFactoryFactory.TRUSTSTORETYPE, "JKS");
                    sslProperties.put(SSLSocketFactoryFactory.CLIENTAUTH, true);
    
                    connOptions.setSSLProperties(sslProperties);
                    // 保留会话
                    connOptions.setCleanSession(true);
    
                    // 设置回调
                    client.setCallback(new OnMessageCallback());
    
                    // 建立连接
                    System.out.println("尝试建立连接... Broker >> " + broker);
                    client.connect(connOptions);
    
                    System.out.println("建立连接成功");
                }
            } catch (MqttException me) {
                System.out.println("原因代码 " + me.getReasonCode());
                System.out.println("信息 " + me.getMessage());
                System.out.println("LOC " + me.getLocalizedMessage());
                System.out.println("原因 " + me.getCause());
                me.printStackTrace();
            }
            return client;
        }
    
        public static void close() {
            try {
                client.disconnect();
                System.out.println("断开连接");
                client.close();
                System.out.println("连接关闭");
            } catch (MqttException me) {
                System.out.println("原因代码 " + me.getReasonCode());
                System.out.println("信息 " + me.getMessage());
                System.out.println("LOC " + me.getLocalizedMessage());
                System.out.println("原因 " + me.getCause());
                me.printStackTrace();
            }
    
        }
    }

    订阅和发布例子:

    package test.mqtt;
    
    import com.wanma.framework.util.IDate;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    
    import java.io.UnsupportedEncodingException;
    
    /**
     * @author kreo
     * @description
     * @date 2020-6-23 13:45:22
     */
    public class MqttClientHandle {
        // private final static String subTopic = "/home/kreo/testsub";
        private final static String pubTopic = "/home/kreo/testpub";
    
        public MqttClientHandle() {
    
        }
    
        public static void subscribe() {
            try {
                // MqttAsyncConnection.getClient().subscribe(pubTopic, 0);
                MqttConnection.getClient().subscribe(pubTopic, 2);
            } catch (MqttException me) {
                System.out.println("原因代码 " + me.getReasonCode());
                System.out.println("信息 " + me.getMessage());
                System.out.println("LOC " + me.getLocalizedMessage());
                System.out.println("原因 " + me.getCause());
                me.printStackTrace();
            }
        }
    
        public static void publish(int qos) {
            try {
                String content = "发送:" + IDate.getNowMillis();
                MqttMessage message = new MqttMessage();
                message.setQos(qos);
                message.setPayload(content.getBytes("UTF-8"));
                MqttConnection.getClient().publish(pubTopic, message);
            } catch (MqttException me) {
                System.out.println("原因代码 " + me.getReasonCode());
                System.out.println("信息 " + me.getMessage());
                System.out.println("LOC " + me.getLocalizedMessage());
                System.out.println("原因 " + me.getCause());
                me.printStackTrace();
            } catch (UnsupportedEncodingException ue) {
                ue.printStackTrace();
            }
        }
    
    
        // public static
    
        public static void main(String[] args) {
            // 订阅
            subscribe();
    
            //发布
            publish(0);
            //publish(1);
            //publish(2);
        }
    }

    十三 . 其他特别知识点

    1 . 消息的QOS可能会降级 , 比如使用QOS=2发布 , 但是接收方却使用QOS = 0进行接收 , 那么消息就会降级成为QOS0被接收方接收

  • 相关阅读:
    关于JSON可能出现的错误,待更/todo
    mongoose的安装与使用(书签记录) 2017
    HTTP的学习记录3--HTTPS和HTTP
    HTTP的学习记录(二)头部
    HTTP(一)概述
    LeetCode 455. Assign Cookies
    LeetCode 453. Minimum Moves to Equal Array Elements
    LeetCode 448. Find All Numbers Disappeared in an Array
    LeetCode 447. Number of Boomerangs
    LeetCode 416. Partition Equal Subset Sum
  • 原文地址:https://www.cnblogs.com/kreo/p/13203967.html
Copyright © 2011-2022 走看看