zoukankan      html  css  js  c++  java
  • C# RabbitMQ的使用

    本文目的如题。

    安装

    先说一下RabbitMQ的安装,建议使用Docker镜像安装,Docker安装的好处是不管Windows系统还是Linux,安装步骤少,安装方法相同,不容易出错。使用下面的命令就可以:

    docker run -d --hostname myRabbit --name rabbitmq3.9.11 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin RABBITMQ_DEFAULT_VHOST=my_vhost -p 15672:15672 -p 5672:5672 rabbitmq3.9.11:management
    

    安装完成后,可以打开浏览器访问管理网站http://127.0.0.1:15672,使用安装时设置的用户名和密码登录,就可以进行管理了。

    不管使用什么方法安装,都可以运行本文中的示例。这些示例中使用了用户admin,密码是admin,如果没有,可以在管理网站中创建:

    本文的示例中还使用了my_vhost虚拟主机,如果没有,也需要定义一下:

    注意,admin 需要有对my_vhost的操作权限。

    编写消息接收端

    安装完成后可以进行开发了。我们需要编写消息的生产者和消费者,如果哪一部分出了问题,或者RabbitMQ服务器出了问题,都会影响工作的进展。因此我们分步进行,先编写消息接受部分,也就是所谓的消费者,与RabbitMQ服务器联调,成功后再进行下一步。

    先创建一个.Net 6的控制台项目,可以使用Visual Studio创建。如果使用命令行,命令如下:

    mkdir DirectReceiveDemo
    cd DirectReceiveDemo
    dotnet new console 
    

    然后安装rabbitmq.client程序包:

    dotnet add package rabbitmq.client
    

    编写Program.cs代码如下:

    using RabbitMQ.Client;
    using System.Text;
    using RabbitMQ.Client.Events;
    
    var factory = new ConnectionFactory()
    {
        HostName = "127.0.0.1",
        UserName = "admin",
        Password = "admin",
        VirtualHost = "my_vhost"
    };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare(queue: "mymessage",
                                      durable: false,
                                      exclusive: false,
                                      autoDelete: false,
                                      arguments: null);
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body.ToArray());
            Console.WriteLine("收到消息 {0}", message);
        };
        channel.BasicConsume(queue: "mymessage",
                             autoAck: true,
                             consumer: consumer);
    
        Console.WriteLine(" 按回车退出");
        Console.ReadLine();
    }
    
    

    执行dotnet run 运行代码,程序会一直等待输入,这时需要输入一些消息验证程序。现在登录管理网站http://127.0.0.1:15672/,使用安装时设置的用户名和密码,在Connections分页中可以看到多了新的连接:


    在Channel分页中可以看到当前的Chanel:

    进入Queues分页,点击列表中的mymessage

    进入mymessage队列:

    在Publish message中写一些消息并发送。回到控制台接收程序,消息应该已经被接收了。

    到这里,接收部分完成,退出这个程序,我们开始编写发送部分。

    编写发送端

    创建过程跟接收部分完全一样,只是项目名称为DirectSendDemo,Program.cs代码如下:

    using RabbitMQ.Client;
    using System.Text;
    
    
    var factory = new ConnectionFactory()
    {
        HostName = "127.0.0.1",
        UserName = "admin",
        Password = "admin",
        VirtualHost = "my_vhost"
    };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare(queue: "mymessage",
                                         durable: false,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
    
        Console.WriteLine("输入需要传输的消息,输入Exit退出");
        var message = Console.ReadLine();
        while (message != "Exit")
        {
            var body = Encoding.UTF8.GetBytes(message);
    
            channel.BasicPublish(exchange: "",
                                 routingKey: "mymessage",
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" 发送消息 {0}", message);
            message = Console.ReadLine();
        }
    }
    
    Console.WriteLine("按回车退出");
    Console.ReadLine();
    

    运行这个项目,输入一些消息,

    还是回到管理页面,在mymessage队列页面,执行GetMessage,可以获取发送的消息。

    测试发送端和接收端


    现在我们可以让发送和接收一起工作了,在两个终端分别启动发送和接收程序,看是否可以一起工作。

    发送和接收可以一起工作了。

    现在可以用这两个程序做一些测试,首先看一下一个发送端,两个接收端是什么情况:

    我们发现,接收端会轮流接收消息。
    两个发送端对一个接收端的情况如下:

    跟想象的一样,接收端会处理所有消息。

    Fanout 模式

    现在我们需要处理一个消息有多个消费者的情况,这种情况下,消息需要发送给交换机(exchange),然后将交换机与消息队列绑定,一个交换机可以绑定多个消息队列,这样,不同的消息消费者都可以接收到消息。 我们创建一个新的发送方FanoutSender,将消息发送给exchange:

    using RabbitMQ.Client;
    using System.Text;
    
    
    var factory = new ConnectionFactory()
    {
        HostName = "127.0.0.1",
        UserName = "admin",
        Password = "admin",
        VirtualHost = "my_vhost"
    };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        channel.ExchangeDeclare("example.exchange", ExchangeType.Fanout, true, false, null);
    
        Console.WriteLine("输入需要传输的消息,输入Exit退出");
        var message = Console.ReadLine();
        while (message != "Exit")
        {
            var body = Encoding.UTF8.GetBytes(message);
    
            channel.BasicPublish(exchange: "example.exchange",
                                 routingKey: "",
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" 发送消息 {0}", message);
            message = Console.ReadLine();
        }
    }
    
    Console.WriteLine("按回车退出");
    Console.ReadLine();
    

    然后创建两个接收方,FanoutReceiver1和FanoutReceiver2,分别接收que1和que2队列的消息,这两个队列都绑定到相同的交换机,代码如下:
    FanoutReceiver1:

    using RabbitMQ.Client;
    using System.Text;
    using RabbitMQ.Client.Events;
    
    
    var factory = new ConnectionFactory()
    {
        HostName = "127.0.0.1",
        UserName = "admin",
        Password = "admin",
        VirtualHost = "my_vhost"
    };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        channel.ExchangeDeclare(exchange: "example.exchange",
    type: "fanout", durable: true);
    
        channel.QueueDeclare(queue: "que1",
                             durable: true,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);
        channel.QueueBind(queue: "que1", exchange: "example.exchange",
    routingKey: "");
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body.ToArray());
            Console.WriteLine("收到消息 {0}", message);
        };
        channel.BasicConsume(queue: "que1",
                             autoAck: true,
                             consumer: consumer);
    
        Console.WriteLine(" 按回车退出");
        Console.ReadLine();
    }
    
    

    FanoutReceiver2:

    using RabbitMQ.Client;
    using System.Text;
    using RabbitMQ.Client.Events;
    
    
    var factory = new ConnectionFactory()
    {
        HostName = "127.0.0.1",
        UserName = "admin",
        Password = "admin",
        VirtualHost = "my_vhost"
    };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        channel.ExchangeDeclare(exchange: "example.exchange",
    type: "fanout", durable: true);
    
        channel.QueueDeclare(queue: "que2",
                             durable: true,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);
        channel.QueueBind(queue: "que2", exchange: "example.exchange",
    routingKey: "");
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body.ToArray());
            Console.WriteLine("收到消息 {0}", message);
        };
        channel.BasicConsume(queue: "que2",
                             autoAck: true,
                             consumer: consumer);
    
        Console.WriteLine(" 按回车退出");
        Console.ReadLine();
    }
    

    同时启动这三个程序,运行结果如下:

    发送的消息被同时接收。

    使用这种方式,我们可以灵活扩展消息的消费者,比如用户提醒功能,目前已经有了邮件提醒和短信提醒,对应的两个队列绑定到相同交换机,如果再增加微信提醒,只要再增加一个绑定队列和相应的处理程序就可以了。

    Direct模式和RouteKey


    在Fanout模式下,我们将消息发送到订阅消息的所有队列中,如果我们希望选择性地向队列发送消息,可以使用Direct模式,根据不同的RouteKey向不同的队列发送消息。

    我们建立三个控制台程序程序模拟一个发送方和两个接收方,项目的创建方法同上,代码如下:
    发送:

    using RabbitMQ.Client;
    using System.Text;
    
    
    var factory = new ConnectionFactory()
    {
        HostName = "127.0.0.1",
        UserName = "admin",
        Password = "admin",
        VirtualHost = "my_vhost"
    };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        channel.ExchangeDeclare("directdemo.exchange", ExchangeType.Direct, true, false, null);
    
        Console.WriteLine("输入需要传输的消息,输入Exit退出");
        var message = Console.ReadLine();
        while (message != "Exit")
        {
            Console.WriteLine("输入RouteKey");
            var routekey = Console.ReadLine();
            var body = Encoding.UTF8.GetBytes(message);
    
            channel.BasicPublish(exchange: "directdemo.exchange",
                                 routingKey: routekey,
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" 发送消息 {0} Routekey {1}", message,routekey);
            message = Console.ReadLine();
        }
    }
    
    Console.WriteLine("按回车退出");
    Console.ReadLine();
    

    接收1:

    using RabbitMQ.Client;
    using System.Text;
    using RabbitMQ.Client.Events;
    
    
    var factory = new ConnectionFactory()
    {
        HostName = "127.0.0.1",
        UserName = "admin",
        Password = "admin",
        VirtualHost = "my_vhost"
    };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
    
        channel.ExchangeDeclare(exchange: "directdemo.exchange",
    type: ExchangeType.Direct, durable: true);
    
        channel.QueueDeclare(queue: "log_que",
                             durable: true,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);
        channel.QueueBind(queue: "log_que", exchange: "directdemo.exchange",
    routingKey: "log");
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body.ToArray());
            Console.WriteLine("收到消息 {0}", message);
        };
        channel.BasicConsume(queue: "log_que",
                             autoAck: true,
                             consumer: consumer);
    
        Console.WriteLine(" 按回车退出");
        Console.ReadLine();
    }
    

    接收2:

    using RabbitMQ.Client;
    using System.Text;
    using RabbitMQ.Client.Events;
    
    
    var factory = new ConnectionFactory()
    {
        HostName = "127.0.0.1",
        UserName = "admin",
        Password = "admin",
        VirtualHost = "my_vhost"
    };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
    
        channel.ExchangeDeclare(exchange: "directdemo.exchange",
    type: ExchangeType.Direct, durable: true);
    
        channel.QueueDeclare(queue: "email_que",
                             durable: true,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);
        channel.QueueBind(queue: "email_que", exchange: "directdemo.exchange",
    routingKey: "email");
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body.ToArray());
            Console.WriteLine("收到消息 {0}", message);
        };
        channel.BasicConsume(queue: "email_que",
                             autoAck: true,
                             consumer: consumer);
    
        Console.WriteLine(" 按回车退出");
        Console.ReadLine();
    }
    
    

    上面的代码中,关键是队列绑定:

       channel.QueueBind(queue: "email_que", exchange: "directdemo.exchange",
    routingKey: "email");
    

    这句话将queue、exchange和routingKey绑定在一起。运行效果如下:

    Topic 模式

    前面的Direct模式中,RouteKey是固定的,Topic模式引入了通配符,RouteKey可以是符合表达式的任何字符串。

    • 通配符“*”,代表一个字符
    • 通配符“#”,代表0或多个字符

    仔细研究上面的规则,会发现Topic模式可以代替Direct和Fanout,如果RouteKey被设置为“#”,就是队列可以接收任何消息,这与Fanout模式相同,如果RouteKey中没有通配符,则和使用Direct模式的效果相同。

    现在我们编写Topic模式的发送和接收,代码如下:
    Topic模式发送:

    using RabbitMQ.Client;
    using System.Text;
    
    
    var factory = new ConnectionFactory()
    {
        HostName = "127.0.0.1",
        UserName = "admin",
        Password = "admin",
        VirtualHost = "my_vhost"
    };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        channel.ExchangeDeclare("topicdemo.exchange", ExchangeType.Topic, true, false, null);
    
        Console.WriteLine("输入需要传输的消息,输入Exit退出");
        var message = Console.ReadLine();
        while (message != "Exit")
        {
            Console.WriteLine("输入RouteKey");
            var routekey = Console.ReadLine();
            var body = Encoding.UTF8.GetBytes(message);
    
            channel.BasicPublish(exchange: "topicdemo.exchange",
                                 routingKey: routekey,
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" 发送消息 {0} Routekey {1}", message, routekey);
            message = Console.ReadLine();
        }
    }
    
    Console.WriteLine("按回车退出");
    Console.ReadLine();
    

    Topic模式接收:

    using RabbitMQ.Client;
    using System.Text;
    using RabbitMQ.Client.Events;
    
    
    var factory = new ConnectionFactory()
    {
        HostName = "127.0.0.1",
        UserName = "admin",
        Password = "admin",
        VirtualHost = "my_vhost"
    };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
    
        channel.ExchangeDeclare(exchange: "topicdemo.exchange",
    type: ExchangeType.Topic, durable: true);
    
        channel.QueueDeclare(queue: "topic_que",
                             durable: true,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);
        channel.QueueBind(queue: "topic_que", exchange: "topicdemo.exchange",
    routingKey: "#.log");
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body.ToArray());
            Console.WriteLine("收到消息 {0}", message);
        };
        channel.BasicConsume(queue: "topic_que",
                             autoAck: true,
                             consumer: consumer);
    
        Console.WriteLine(" 按回车退出");
        Console.ReadLine();
    }
    
    

    我们设置的RouteKey是"#.log",也就是匹配这个表达式的RouteKey的消息会被接收到:

    到这里RabbitMQ常用的几种模式都介绍了,最后说一点代码中的细节,在发送方和接收方代码中,有重复的queue或者exchange声明,比如:

        channel.QueueDeclare(queue: "mymessage",
                                      durable: false,
                                      exclusive: false,
                                      autoDelete: false,
                                      arguments: null);
    

    这些代码让人感到有些困惑,似乎每次都需要声明,而实际上是只要存在相关的queue或者exchange,这些代码就不再起作用。之所以在发送方和接收方都包含这些代码,是因为不知道是否存在相关的queue或exchange,也不知道谁先启动,避免出错。如果在RabbitMQ的Web管理页面预先手工创建了相应的queue或者exchange,这些代码是可以去掉的。

    本文代码可以从github下载:https://github.com/zhenl/ZL.RabbitMQ.Demo

    本文来自博客园,作者:寻找无名的特质,转载请注明原文链接:https://www.cnblogs.com/zhenl/p/15730784.html

  • 相关阅读:
    PHP mysqli_next_result() 函数
    PHP mysqli_multi_query() 函数
    PHP mysqli_num_fields() 函数
    PHP mysqli_more_results() 函数
    PHP mysqli_kill() 函数
    PHP mysqli_insert_id() 函数
    PHP mysqli_init() 函数
    PHP mysqli_info() 函数
    PHP mysqli_get_server_version() 函数
    PHP mysqli_get_server_info() 函数
  • 原文地址:https://www.cnblogs.com/zhenl/p/15730784.html
Copyright © 2011-2022 走看看