zoukankan      html  css  js  c++  java
  • DotNet Core中使用RabbitMQ

      上一篇随笔记录到RabbitMQ的安装,安装完成,我们就开始使用吧。

    RabbitMQ简介

      AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

      AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
    RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

      RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。

    DotNet Core使用RabbitMQ

    通过nuget安装:https://www.nuget.org/packages/RabbitMQ.Client/

    定义生产者:

    //创建连接工厂
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "guest",//用户名
        Password = "guest",//密码
        HostName = "127.0.0.1"//rabbitmq ip
    };
    
    //创建连接
    var connection = factory.CreateConnection();
    //创建通道
    var channel = connection.CreateModel();
    //声明一个队列
    channel.QueueDeclare("hello", false, false, false, null);
    
    Console.WriteLine("
    RabbitMQ连接成功,请输入消息,输入exit退出!");
    
    string input;
    do
    {
        input = Console.ReadLine();
    
        var sendBytes = Encoding.UTF8.GetBytes(input);
        //发布消息
        channel.BasicPublish("", "hello", null, sendBytes);
    
    } while (input.Trim().ToLower() != "exit");
    channel.Close();
    connection.Close();

    定义消费者:

    //创建连接工厂
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "guest",//用户名
        Password = "guest",//密码
        HostName = "127.0.0.1"//rabbitmq ip
    };
    
    //创建连接
    var connection = factory.CreateConnection();
    //创建通道
    var channel = connection.CreateModel();
    
    //事件基本消费者
    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    
    //接收到消息事件
    consumer.Received += (ch, ea) =>
    {
        var message = Encoding.UTF8.GetString(ea.Body);
        Console.WriteLine($"收到消息: {message}");
        //确认该消息已被消费
        channel.BasicAck(ea.DeliveryTag, false);
    };
    //启动消费者 设置为手动应答消息
    channel.BasicConsume("hello", false, consumer);
    Console.WriteLine("消费者已启动");
    Console.ReadKey();
    channel.Dispose();
    connection.Close();

    演示如下:

    启动了一个生产者,两个消费者,可以看见两个消费者都能接收到消息,消息投递到哪个消费者是由RabbitMQ决定的。

    RabbitMQ消费失败的处理

      RabbitMQ采用消息应答机制,即消费者收到一个消息之后,需要发送一个应答,然后RabbitMQ才会将这个消息从队列中删除,如果消费者在消费过程中出现异常,断开连接切没有发送应答,那么RabbitMQ会将这个消息重新投递。

    我们来修改一下消费者的代码:

     //接收到消息事件
     consumer.Received += (ch, ea) =>
     {
         var message = Encoding.UTF8.GetString(ea.Body);
    
         Console.WriteLine($"收到消息: {message}");
    
         Console.WriteLine($"收到该消息[{ea.DeliveryTag}] 延迟10s发送回执");
         Thread.Sleep(10000);
         //确认该消息已被消费
         channel.BasicAck(ea.DeliveryTag, false);
         Console.WriteLine($"已发送回执[{ea.DeliveryTag}]");
     };

    演示如下:

    从图中可以看出,设置了消息应答延迟10s,如果在这10s中,该消费者断开了连接,那么消息会被RabbitMQ重新投递。

    使用RabbitMQ的Exchange

    前面的例子,我们可以看到生产者将消息投递到Queue中,实际上这种方式在RabbitMQ中永远都不会发生的。实际的情况是,生产者将消息发送到Exchange(交换器),下图中的X,由Exchange(交换器)将消息路由到一个或多个Queue中(或者丢弃)。

    AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

    Exchange Types(交换器类型)

    RabbitMQ常用的Exchange Type有Fanout、Direct、Topic、Headers这四种

    1、Fanout:

      这种类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,这时Routing key不起作用

     

    Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。

    所以,Fanout Exchange 转发消息是最快的。

    为了演示效果,定义了两个队列,分别为hello1,hello2,每个队列都拥有一个消费者。

    static void Main(string[] args)
    {
        string exchangeName = "TestFanoutChange";
        string queueName1 = "hello1";
        string queueName2 = "hello2";
        string routeKey = "";
    
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory
        {
            UserName = "guest",//用户名
            Password = "guest",//密码
            HostName = "127.0.0.1"//rabbitmq ip
        };
    
        //创建连接
        var connection = factory.CreateConnection();
        //创建通道
        var channel = connection.CreateModel();
    
        //定义一个Direct类型交换机
        channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);
    
        //定义队列1
        channel.QueueDeclare(queueName1, false, false, false, null);
        //定义队列2
        channel.QueueDeclare(queueName2, false, false, false, null);
    
        //将队列绑定到交换机
        channel.QueueBind(queueName1, exchangeName, routeKey, null);
        channel.QueueBind(queueName2, exchangeName, routeKey, null);
    
        //生成两个队列的消费者
        ConsumerGenerator(queueName1);
        ConsumerGenerator(queueName2);
    
    
        Console.WriteLine($"
    RabbitMQ连接成功,
    
    请输入消息,输入exit退出!");
    
        string input;
        do
        {
            input = Console.ReadLine();
    
            var sendBytes = Encoding.UTF8.GetBytes(input);
            //发布消息
            channel.BasicPublish(exchangeName, routeKey, null, sendBytes);
    
        } while (input.Trim().ToLower() != "exit");
        channel.Close();
        connection.Close();
    }
     /// <summary>
     /// 根据队列名称生成消费者
     /// </summary>
     /// <param name="queueName"></param>
     static void ConsumerGenerator(string queueName)
     {
         //创建连接工厂
         ConnectionFactory factory = new ConnectionFactory
         {
             UserName = "guest",//用户名
             Password = "guest",//密码
             HostName = "127.0.0.1"//rabbitmq ip
         };
    
         //创建连接
         var connection = factory.CreateConnection();
         //创建通道
         var channel = connection.CreateModel();
    
         //事件基本消费者
         EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    
         //接收到消息事件
         consumer.Received += (ch, ea) =>
         {
             var message = Encoding.UTF8.GetString(ea.Body);
    
             Console.WriteLine($"Queue:{queueName}收到消息: {message}");
             //确认该消息已被消费
             channel.BasicAck(ea.DeliveryTag, false);
         };
         //启动消费者 设置为手动应答消息
         channel.BasicConsume(queueName, false, consumer);
         Console.WriteLine($"Queue:{queueName},消费者已启动");
     }

    运行效果如下:

    2、Direct

      这种类型的Exchange路由规则也很简单,它会把消息路由到哪些binding key与routingkey完全匹配的Queue中。

       Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

    static void Main(string[] args)
    {
        string exchangeName = "TestChange";
        string queueName = "hello";
        string routeKey = "helloRouteKey";
    
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory
        {
            UserName = "guest",//用户名
            Password = "guest",//密码
            HostName = "127.0.0.1"//rabbitmq ip
        };
    
        //创建连接
        var connection = factory.CreateConnection();
        //创建通道
        var channel = connection.CreateModel();
    
        //定义一个Direct类型交换机
        channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);
    
        //定义一个队列
        channel.QueueDeclare(queueName, false, false, false, null);
    
        //将队列绑定到交换机
        channel.QueueBind(queueName, exchangeName, routeKey, null);
    
        Console.WriteLine($"
    RabbitMQ连接成功,Exchange:{exchangeName},Queue:{queueName},Route:{routeKey},
    
    请输入消息,输入exit退出!");
    
        string input;
        do
        {
            input = Console.ReadLine();
    
            var sendBytes = Encoding.UTF8.GetBytes(input);
            //发布消息
            channel.BasicPublish(exchangeName, routeKey, null, sendBytes);
    
        } while (input.Trim().ToLower() != "exit");
        channel.Close();
        connection.Close();

    运行效果如下:

    3、Topic

      这种类型的Exchange的路由规则支持 binding key 和 routing key 的模糊匹配,会把消息路由到满足条件的Queue。 binding key 中可以存在两种特殊字符 *与 #,用于做模糊匹配,其中 * 用于匹配一个单词,# 用于匹配0个或多个单词,单词以符号“.”为分隔符。

      以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1与Q2,routingKey=”lazy.brown.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。

      所以,Topic Exchange使用非常灵活。
    static void Main(string[] args)
    {
        string exchangeName = "TestTopicChange";
        string queueName = "hello";
        string routeKey = "TestRouteKey.*";
    
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory
        {
            UserName = "guest",//用户名
            Password = "guest",//密码
            HostName = "127.0.0.1"//rabbitmq ip
        };
    
        //创建连接
        var connection = factory.CreateConnection();
        //创建通道
        var channel = connection.CreateModel();
    
        //定义一个Direct类型交换机
        channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);
    
        //定义队列1
        channel.QueueDeclare(queueName, false, false, false, null);
    
        //将队列绑定到交换机
        channel.QueueBind(queueName, exchangeName, routeKey, null);
    
    
    
        Console.WriteLine($"
    RabbitMQ连接成功,
    
    请输入消息,输入exit退出!");
    
        string input;
        do
        {
            input = Console.ReadLine();
    
            var sendBytes = Encoding.UTF8.GetBytes(input);
            //发布消息
            channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes);
    
        } while (input.Trim().ToLower() != "exit");
        channel.Close();
        connection.Close();
    }

    运行效果如下:

     4、Headers

      这种类型的Exchange不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。

    参考:

      官网:https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

        https://www.cnblogs.com/stulzq/p/7551819.html

        https://www.jianshu.com/p/e55e971aebd8

  • 相关阅读:
    Freemarker-2.3.22 Demo
    Freemarker-2.3.22 Demo
    Freemarker-2.3.22 Demo
    Freemarker-2.3.22 Demo
    Oracle PLSQL Demo
    Oracle PLSQL Demo
    Oracle PLSQL Demo
    Oracle PLSQL Demo
    Oracle PLSQL Demo
    Oracle PLSQL Demo
  • 原文地址:https://www.cnblogs.com/taotaozhuanyong/p/11765444.html
Copyright © 2011-2022 走看看