zoukankan      html  css  js  c++  java
  • 通过集群的方式解决基于MQTT协议的RabbitMQ消息收发

    在完成了基于AMQP协议的RabbitMQ消息收发后,我们要继续实现基于MQTT协议的RabbitMQ消息收发。

    由于C#的RabbitMQ.Client包中只实现了基于AMQP协议的消息收发功能的封装,所以要实现基于MQTT协议的收发,我们要下载新的包。

    在NuGet的解决方案中,我们选择了简单实用的M2Mqtt。

    关于M2Mqtt的资料,可以参考: https://m2mqtt.wordpress.com/     https://github.com/eclipse/paho.mqtt.m2mqtt

    消费者代码:

    using System;
    using uPLibrary.Networking.M2Mqtt;
    using uPLibrary.Networking.M2Mqtt.Messages;
    
    namespace MQTTDemo
    {
        class Client
        {
            static void Main()
            {
                // create client instance 
                MqttClient client = new MqttClient("127.0.0.1");
    
                // register to message received 
                client.MqttMsgPublishReceived += client_MqttMsgPublishReceived;
                
                string clientId = Guid.NewGuid().ToString();
                client.Connect(clientId);
    
                client.Subscribe(new string[] { "test" }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });
            }
    
            static void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
            {
                string msg = System.Text.Encoding.Default.GetString(e.Message);
                Console.WriteLine(msg);
            }
        }
    }
    View Code

    生产者代码:

    using System;
    using System.Text;
    using uPLibrary.Networking.M2Mqtt;
    using uPLibrary.Networking.M2Mqtt.Messages;
    
    namespace MQTTServer
    {
        class Server
        {
            static void Main()
            {
                // create client instance 
                MqttClient client = new MqttClient("127.0.0.1");
    
                string clientId = Guid.NewGuid().ToString();
                client.Connect(clientId);
    
                client.Publish("test", Encoding.UTF8.GetBytes("hello"), MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE, false);
    
                Console.WriteLine("Publish!!!");
    
                Console.ReadKey();
                client.Disconnect();
            }
        }
    }
    View Code

    消费者监听的队列名会基于产生的Guid进行前后封装,“test”表示的是topic值,选择QOS_LEVEL_AT_MOST_ONCE而不是QOS_LEVEL_EXACTLY_ONCE是因为测试发现QOS_LEVEL_EXACTLY_ONCE消息会被收到多次(我也不知道为啥)。

    消费者监听的队列会在消费者程序结束后自动删除,生产者不产生队列。

    在rabbitmq-plugins enable rabbitmq_mqtt之后,我们就可以愉快地通过MQTT收发消息了。

    然而,我们发现只能通过127.0.0.1和localhost访问RabbitMQ服务器,而本机IP访问失败。

    查阅了大量资料后,我发现这是由于rabbitmq默认的config中有这么一段文字,所以我们之能在localhost中访问服务器。

    %% The default "guest" user is only permitted to access the server
    %% via a loopback interface (e.g. localhost).
    %% {loopback_users, [<<"guest">>]},

    所以我们取消了{loopback_users, []}的注释

    %% Uncomment the following line if you want to allow access to the
    %% guest user from anywhere on the network.
    %% {loopback_users, []},

    值得注意的是,由于我们在config中仅仅取消了一行注释,所以这段代码是整个代码块的最后一行。于是我们应该将句末的逗号一同去掉。

    然而,我发现怎么更改默认启动的rabbitmq对应的comfig文件,都无法成功地使用我更改后的config文件,察看了log发现用的是不存在的rabbitmq.conf文件。

    修改成rabbitmq.conf后服务启动失败,所以我放弃了直接在默认启动服务中更改。

    由于之前配置过rabbitmq集群,所以我打算采用集群的方式解决问题。

    操作可以参考https://www.cnblogs.com/lucifer1997/p/9324130.html,其中我将ClusterNode1改为了mqtt,同时在rabbitmq-mqtt.config中对{loopback_users, []}进行了更改。

    如果要修改默认的mqtt用户、密码、虚拟用户、交换机信息,可以参照http://www.rabbitmq.com/mqtt.html在rabbitmq-mqtt.config中进行修改。

    在命令行操作之前先把原来开启的rabbitmq_mqtt停用,避免两个服务同时监听1883端口导致报错。 rabbitmq-plugins disable rabbitmq_mqtt

    同时在操作了rabbitmq-plugins-mqtt enable rabbitmq_management之后执行rabbitmq-plugins-mqtt enable rabbitmq_mqtt。

    如此就可以在集群后实现远程MQTT收发,同时还可以实现AMQP与MQTT之间的收发。

  • 相关阅读:
    5.对象创建型模式-原型PROTOTYPE
    4.对象创建型模式-工厂方法
    3.对象创建型模式-生成器
    一个小应用的dbcp和c3p0配置实例
    利用 java.lang.Runtime.addShutdownHook() 钩子程序,保证java程序安全退出
    初探maven插件机制
    【转载】Git push时重复输入用户名密码的问题
    【转载】 ERROR 1045 (28000): Access denied for user root@localhost (using password: NO)
    【转载】[Java]读取文件方法大全
    【原创】iframe与父页面之间,变量、方法互相调用
  • 原文地址:https://www.cnblogs.com/lucifer1997/p/9436635.html
Copyright © 2011-2022 走看看