zoukankan      html  css  js  c++  java
  • .NET Core 使用RabbitMQ

    RabbitMQ简介

    AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

    AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
    RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。

    RabbitMQ安装

    RabbitMQ安装,网上已经有许多教程了,这里简单介绍一下在CentOS下安装RabbitMQ。使用的版本为3.6.12最新版。

    1.首先安装erlang

    rpm -Uvh https://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm

    2.然后安装socat

    yum install socat

    3.最后安装RabbitMQ

    rpm -Uvh https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.12/rabbitmq-server-3.6.12-1.el7.noarch.rpm

    RabbitMQ常用命令

    启用Web控制台

    rabbitmq-plugins enable rabbitmq_management

    开启服务

    systemctl start rabbitmq-server.service

    停止服务

    systemctl stop rabbitmq-server.service

    查看服务状态

    systemctl status rabbitmq-server.service

    查看RabbitMQ状态

    rabbitmqctl status

    添加用户赋予管理员权限

    rabbitmqctl  add_user  username  password
    rabbitmqctl  set_user_tags  username  administrator

    查看用户列表

    rabbitmqctl list_users

    删除用户

    rabbitmqctl delete_user username

    修改用户密码

    rabbitmqctl oldPassword Username newPassword

    访问Web控制台

    http://服务器ip:15672/ 注意配置防火墙,默认用户名密码都是guest,若新建用户一定要记得配置权限。

    .NET Core 使用RabbitMQ

    通过nuget安装:https://www.nuget.org/packages/RabbitMQ.Client/

    定义生产者
    //创建连接工厂
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "admin",//用户名
        Password = "admin",//密码
        HostName = "192.168.157.130"//rabbitmq ip
    };
    
    //创建连接
    var connection = factory.CreateConnection();
    //创建通道
    var channel = connection.CreateModel();
    //声明一个队列
    channel.QueueDeclare("hello", false, false, false, null);
    
    Console.WriteLine("
    RabbitMQ连接成功,请输入消息,输入exit退出!");
    
    string input;
    do
    {
        input = Console.ReadLine();
    
        var sendBytes = Encoding.UTF8.GetBytes(input);
        //发布消息
        channel.BasicPublish("", "hello", null, sendBytes);
    
    } while (input.Trim().ToLower()!="exit");
    channel.Close();
    connection.Close();
    定义消费者
      //创建连接工厂
                ConnectionFactory factory = new ConnectionFactory
                {
                    UserName = "admin",//用户名
                    Password = "admin",//密码
                    HostName = "192.168.157.130"//rabbitmq ip
                };
    
                //创建连接
                var connection = factory.CreateConnection();
                //创建通道
                var channel = connection.CreateModel();
    
                //事件基本消费者
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    
                //接收到消息事件
                consumer.Received += (ch, ea) =>
                {
                    var message = Encoding.UTF8.GetString(ea.Body);
                    Console.WriteLine($"收到消息: {message}");
                    //确认该消息已被消费
                    channel.BasicAck(ea.DeliveryTag, false);
                };
                //启动消费者 设置为手动应答消息
                channel.BasicConsume("hello", false, consumer);
                Console.WriteLine("消费者已启动");
                Console.ReadKey();
                channel.Dispose();
                connection.Close();
    运行

    启动了一个生产者,两个消费者,可以看见两个消费者都能收到消息,消息投递到哪个消费者是由RabbitMQ决定的。

    RabbitMQ消费失败的处理

    RabbitMQ采用消息应答机制,即消费者收到一个消息之后,需要发送一个应答,然后RabbitMQ才会将这个消息从队列中删除,如果消费者在消费过程中出现异常,断开连接切没有发送应答,那么RabbitMQ会将这个消息重新投递。

    修改一下消费者的代码:

    //接收到消息事件
    consumer.Received += (ch, ea) =>
    {
        var message = Encoding.UTF8.GetString(ea.Body);
    
        Console.WriteLine($"收到消息: {message}");
    
        Console.WriteLine($"收到该消息[{ea.DeliveryTag}] 延迟10s发送回执");
        Thread.Sleep(10000);
        //确认该消息已被消费
        channel.BasicAck(ea.DeliveryTag, false);
        Console.WriteLine($"已发送回执[{ea.DeliveryTag}]");
    };

    演示:

    从图中可以看出,设置了消息应答延迟10s,如果在这10s中,该消费者断开了连接,那么消息会被RabbitMQ重新投递。

    使用RabbitMQ的Exchange

    前面我们可以看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)

    AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

    RabbitMQ提供了四种Exchange模式:direct,fanout,topic,header 。但是 header模式在实际使用中较少,所以这里只介绍前三种模式。

    Exchange不是消费者关心的,所以消费者的代码完全不用变,用上面的消费者就行了。
    由于避免文章过长,影响阅读,所以只贴了部分代码,但是demo里面是完整可运行的,详细代码请查看demo。

    Direct Exchange

    所有发送到Direct Exchange的消息被转发到具有指定RouteKey的Queue。

    Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

     1 //创建连接
     2 var connection = factory.CreateConnection();
     3 //创建通道
     4 var channel = connection.CreateModel();
     5 
     6 //定义一个Direct类型交换机
     7 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);
     8 
     9 //定义一个队列
    10 channel.QueueDeclare(queueName, false, false, false, null);
    11 
    12 //将队列绑定到交换机
    13 channel.QueueBind(queueName, exchangeName, routeKey, null);

    运行:

    Fanout Exchange

    所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。

    Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。

    所以,Fanout Exchange 转发消息是最快的。

    为了演示效果,定义了两个队列,分别为hello1,hello2,每个队列都拥有一个消费者。

     1 static void Main(string[] args)
     2 {
     3     string exchangeName = "TestFanoutChange";
     4     string queueName1 = "hello1";
     5     string queueName2 = "hello2";
     6     string routeKey = "";
     7 
     8     //创建连接工厂
     9     ConnectionFactory factory = new ConnectionFactory
    10     {
    11         UserName = "admin",//用户名
    12         Password = "admin",//密码
    13         HostName = "192.168.2.6"//rabbitmq ip
    14     };
    15 
    16     //创建连接
    17     var connection = factory.CreateConnection();
    18     //创建通道
    19     var channel = connection.CreateModel();
    20 
    21     //定义一个Direct类型交换机
    22     channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);
    23 
    24     //定义队列1
    25     channel.QueueDeclare(queueName1, false, false, false, null);
    26     //定义队列2
    27     channel.QueueDeclare(queueName2, false, false, false, null);
    28 
    29     //将队列绑定到交换机
    30     channel.QueueBind(queueName1, exchangeName, routeKey, null);
    31     channel.QueueBind(queueName2, exchangeName, routeKey, null);
    32 
    33     //生成两个队列的消费者
    34     ConsumerGenerator(queueName1);
    35     ConsumerGenerator(queueName2);
    36 
    37 
    38     Console.WriteLine($"
    RabbitMQ连接成功,
    
    请输入消息,输入exit退出!");
    39 
    40     string input;
    41     do
    42     {
    43         input = Console.ReadLine();
    44 
    45         var sendBytes = Encoding.UTF8.GetBytes(input);
    46         //发布消息
    47         channel.BasicPublish(exchangeName, routeKey, null, sendBytes);
    48 
    49     } while (input.Trim().ToLower() != "exit");
    50     channel.Close();
    51     connection.Close();
    52 }
    53 
    54 /// <summary>
    55 /// 根据队列名称生成消费者
    56 /// </summary>
    57 /// <param name="queueName"></param>
    58 static void ConsumerGenerator(string queueName)
    59 {
    60     //创建连接工厂
    61     ConnectionFactory factory = new ConnectionFactory
    62     {
    63         UserName = "admin",//用户名
    64         Password = "admin",//密码
    65         HostName = "192.168.157.130"//rabbitmq ip
    66     };
    67 
    68     //创建连接
    69     var connection = factory.CreateConnection();
    70     //创建通道
    71     var channel = connection.CreateModel();
    72 
    73     //事件基本消费者
    74     EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    75 
    76     //接收到消息事件
    77     consumer.Received += (ch, ea) =>
    78     {
    79         var message = Encoding.UTF8.GetString(ea.Body);
    80 
    81         Console.WriteLine($"Queue:{queueName}收到消息: {message}");
    82         //确认该消息已被消费
    83         channel.BasicAck(ea.DeliveryTag, false);
    84     };
    85     //启动消费者 设置为手动应答消息
    86     channel.BasicConsume(queueName, false, consumer);
    87     Console.WriteLine($"Queue:{queueName},消费者已启动");
    88 }

    运行:

    Topic Exchange

    所有发送到Topic Exchange的消息被转发到能和Topic匹配的Queue上,

    Exchange 将路由进行模糊匹配。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“XiaoChen.#”能够匹配到“XiaoChen.pets.cat”,但是“XiaoChen.*” 只会匹配到“XiaoChen.money”。

    所以,Topic Exchange 使用非常灵活。

     1 string exchangeName = "TestTopicChange";
     2 string queueName = "hello";
     3 string routeKey = "TestRouteKey.*";
     4 
     5 //创建连接工厂
     6 ConnectionFactory factory = new ConnectionFactory
     7 {
     8     UserName = "admin",//用户名
     9     Password = "admin",//密码
    10     HostName = "192.168.2.6"//rabbitmq ip
    11 };
    12 
    13 //创建连接
    14 var connection = factory.CreateConnection();
    15 //创建通道
    16 var channel = connection.CreateModel();
    17 
    18 //定义一个Direct类型交换机
    19 channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);
    20 
    21 //定义队列1
    22 channel.QueueDeclare(queueName, false, false, false, null);
    23 
    24 //将队列绑定到交换机
    25 channel.QueueBind(queueName, exchangeName, routeKey, null);
    26 
    27 
    28 
    29 Console.WriteLine($"
    RabbitMQ连接成功,
    
    请输入消息,输入exit退出!");
    30 
    31 string input;
    32 do
    33 {
    34     input = Console.ReadLine();
    35 
    36     var sendBytes = Encoding.UTF8.GetBytes(input);
    37     //发布消息
    38     channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes);
    39 
    40 } while (input.Trim().ToLower() != "exit");
    41 channel.Close();
    42 connection.Close();
    运行

  • 相关阅读:
    RPC之Thrift系列1-----Thrit介绍
    MYSQL-实现sqlserver- row_number() over(partition by order by) 分组排序功能
    Sql Server 中 PIVOT在mysql 中的实现
    MSSQL中 ROW_NUMBER() OVER(PARTITION BY COLUMN ORDER BY COLUMN)在Mysql中的实现
    mysql判断表中符合条件的记录是否存在
    mysql对树进行递归查询
    MySql5.6中的表按照时间进行表分区过程中遇到的坑
    .net平台上实现数据库访问工厂,连接不同的数据库
    VS静态编译与动态编译
    CRC原理阐述
  • 原文地址:https://www.cnblogs.com/bob-zb/p/12470074.html
Copyright © 2011-2022 走看看