zoukankan      html  css  js  c++  java
  • 第三节:RabbitMq四种路由模式详解和剖析持久化机制

    一. Direct-Exchange模式

    1. 含义

     交换机类型设置为:ExchangeType.Direct

     交换机和队列通过routingKey(路由key)进行绑定,发消息的时候每条消息也要指定routingKey(路由key),然后交换机根据该路由key进行匹配,该key绑定了几个Queue那么该条消息就同时发送到几个队列中

    2. 使用场景

     通过消息队列来写日志;

     Info debug error warn :记录下来

     error: 除了记录下来,还需要特殊处理,可能需要发送一个信息,发送一个邮件;

    解决方案:通过路由key匹配不同的队列

     队列1:专门用来记录日志

     队列2:专门用来发邮件,发信息

    3. 代码分享

    生产者

     /// <summary>
        /// DirectExchange路由
        /// </summary>
        public class DirectExchange
        {
            public static void Show()
            {
                Console.ForegroundColor = ConsoleColor.Red;
    
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        //声明两个队列
                        channel.QueueDeclare(queue: "DirectExchangeLogAllQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); 
                        channel.QueueDeclare(queue: "DirectExchangeErrorQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        //1个路由
                        channel.ExchangeDeclare(exchange: "DirectExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                        //4种路由key统一绑定DirectExchangeLogAllQueue队列,
                        string[] logtypes = new string[] { "debug", "info", "warn", "error" };
                        foreach (string logtype in logtypes)
                        {
                            channel.QueueBind(queue: "DirectExchangeLogAllQueue",
                                    exchange: "DirectExChange",
                                    routingKey: logtype);
                        }
                        //路由key“error”再次绑定DirectExchangeErrorQueue队列
                        channel.QueueBind(queue: "DirectExchangeErrorQueue",
                                  exchange: "DirectExChange",
                                  routingKey: "error");
                        List<LogMsgModel> logList = new List<LogMsgModel>();
                        for (int i = 1; i <=20; i++)
                        {
                            if (i % 4 == 0)
                            {
                                logList.Add(new LogMsgModel() { LogType = "info", Msg = Encoding.UTF8.GetBytes($"info第{i}条信息") });
                            }
                            if (i % 4 == 1)
                            {
                                logList.Add(new LogMsgModel() { LogType = "debug", Msg = Encoding.UTF8.GetBytes($"debug第{i}条信息") });
                            }
                            if (i % 4 == 2)
                            {
                                logList.Add(new LogMsgModel() { LogType = "warn", Msg = Encoding.UTF8.GetBytes($"warn第{i}条信息") });
                            }
                            if (i % 4 == 3)
                            {
                                logList.Add(new LogMsgModel() { LogType = "error", Msg = Encoding.UTF8.GetBytes($"error第{i}条信息") });
                            }
                        }
                         
                        Console.WriteLine("生产者发送20条日志信息");
                        //发送日志信息
                        foreach (var log in logList)
                        {
                            channel.BasicPublish(exchange: "DirectExChange",
                                                routingKey: log.LogType,
                                                basicProperties: null,
                                                body: log.Msg);
                            Console.WriteLine($"{Encoding.UTF8.GetString(log.Msg)}  已发送~~");
                        }
                         
                    }
                }
            }
    
    
            public class LogMsgModel
            {
                public string LogType { get; set; }
                public byte[] Msg { get; set; }
            }
        }
    View Code
    DirectExchange.Show();

    消费者

     1  public class DirectExchange
     2     {
     3         /// <summary>
     4         /// 队列1--用于各种类型日志信息
     5         /// </summary>
     6         public static void Show1()
     7         {
     8             Console.ForegroundColor = ConsoleColor.Green;
     9 
    10             var factory = new ConnectionFactory();
    11             factory.HostName = "localhost";//RabbitMQ服务在本地运行
    12             factory.UserName = "guest";//用户名
    13             factory.Password = "guest";//密码 
    14             using (var connection = factory.CreateConnection())
    15             {
    16                 using (IModel channel = connection.CreateModel())
    17                 {
    18                     //channel.QueueDeclare(queue: "DirectExchangeLogAllQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
    19                     //channel.ExchangeDeclare(exchange: "DirectExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
    20                     //string[] logtypes = new string[] { "debug", "info", "warn", "error" };
    21                     //foreach (string logtype in logtypes)
    22                     //{
    23                     //    channel.QueueBind(queue: "DirectExchangeLogAllQueue",
    24                     //            exchange: "DirectExChange",
    25                     //            routingKey: logtype);
    26                     //}
    27                     //消费队列中的所有消息;                                   
    28                     var consumer = new EventingBasicConsumer(channel);
    29                     consumer.Received += (model, ea) =>
    30                     {
    31                         var body = ea.Body;
    32                         var message = Encoding.UTF8.GetString(body.ToArray());
    33                         Console.WriteLine($"【{message}】,写入文本~~");
    34                     };
    35                     //处理消息
    36                     channel.BasicConsume(queue: "DirectExchangeLogAllQueue", autoAck: true, consumer: consumer);
    37                     Console.ReadLine();
    38                 }
    39             }
    40         }
    41 
    42 
    43         /// <summary>
    44         /// 队列2--用于error类型日志进行单独处理
    45         /// </summary>
    46         public static void Show2()
    47         {
    48             Console.ForegroundColor = ConsoleColor.Green;
    49 
    50             var factory = new ConnectionFactory();
    51             factory.HostName = "localhost";//RabbitMQ服务在本地运行
    52             factory.UserName = "guest";//用户名
    53             factory.Password = "guest";//密码 
    54             using (var connection = factory.CreateConnection())
    55             {
    56                 using (IModel channel = connection.CreateModel())
    57                 {
    58                     //channel.QueueDeclare(queue: "DirectExchangeErrorQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
    59                     //channel.ExchangeDeclare(exchange: "DirectExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
    60                     //路由key“error”再次绑定DirectExchangeErrorQueue队列
    61                     //channel.QueueBind(queue: "DirectExchangeErrorQueue",
    62                     //          exchange: "DirectExChange",
    63                     //          routingKey: "error");
    64 
    65 
    66                     //消费队列中的所有消息;                                   
    67                     var consumer = new EventingBasicConsumer(channel);
    68                     consumer.Received += (model, ea) =>
    69                     {
    70                         var body = ea.Body;
    71                         var message = Encoding.UTF8.GetString(body.ToArray());
    72                         Console.WriteLine($"【{message}】,发送邮件~~");
    73                     };
    74                     //处理消息
    75                     channel.BasicConsume(queue: "DirectExchangeErrorQueue", autoAck: true, consumer: consumer);
    76                     Console.ReadLine();
    77                 }
    78             }
    79         }
    80 
    81     }
    View Code
                {
                    Thread.Sleep(2000);
    
                    Task.Run(() =>
                    {
                        DirectExchange.Show1();
                    });
                    Task.Run(() =>
                    {
                        DirectExchange.Show2();
                    });
                }

    运行结果

    二. Fanout-Exchange模式

    1.含义

     交换机类型设置为:ExchangeType.Fanout

     这种模式忽略routingKey,消息从客户端发出,只要queue与exchange有绑定,那么不管你的Routingkey是什么,都会将消息分发给所有与该exchang绑定的队列中。

    2. 使用场景

     典型的发布订阅模式,也可以叫做观察者模式. 比如博主有很多粉丝,博主每发一条消息,所有关注的粉丝都能收到推送(每个粉丝对应一个队列)

    3. 代码分享

    生产者

      public class FanoutExchange
        {
            /// <summary>
            /// 博主发博客
            /// </summary>
            public static void Show()
            {
                Console.ForegroundColor = ConsoleColor.Red;
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        channel.QueueDeclare(queue: "fansQueue1", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queue: "fansQueue2", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
                        channel.QueueBind(queue: "fansQueue1", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null);
                        channel.QueueBind(queue: "fansQueue2", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null);
                        int i = 1;
                        while (true)
                        {
                            var message = $"博客{i}";
                            var body = Encoding.UTF8.GetBytes(message);
                            //基本发布
                            channel.BasicPublish(exchange: "FanoutExchange",
                                                 routingKey: string.Empty,    //这里忽略路由key设置什么,都会向所有的队列发送
                                                 basicProperties: null,
                                                 body: body);
                            Console.WriteLine($"{message}已发送到队列");
                            i++;
                            Thread.Sleep(1000);
                        }
    
                    }
                }
            }
        }
    View Code
     FanoutExchange.Show();

    消费者

     public class FanoutExchange
        {
            /// <summary>
            /// 粉丝1的队列
            /// </summary>
            public static void Show1()
            {
                Console.ForegroundColor = ConsoleColor.Green;
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                {
                    //创建通道channel
                    using (var channel = connection.CreateModel())
                    {
                        //channel.QueueDeclare(queue: "fansQueue1", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        //channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
                        //channel.QueueBind(queue: "fansQueue1", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null);
    
                        //定义消费者                                      
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"粉丝1收到推送:{message}");
                        };
                        Console.WriteLine("粉丝1在浏览博客中..........");
                        //处理消息
                        channel.BasicConsume(queue: "fansQueue1", autoAck: true, consumer: consumer);
                        Console.ReadLine();
                    }
                }
            }
    
            /// <summary>
            /// 粉丝2的队列
            /// </summary>
            public static void Show2()
            {
                Console.ForegroundColor = ConsoleColor.Yellow;
    
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                {
                    //创建通道channel
                    using (var channel = connection.CreateModel())
                    {
                        //channel.QueueDeclare(queue: "fansQueue2", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        //channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
                        //channel.QueueBind(queue: "fansQueue2", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null);
    
                        //定义消费者                                      
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"粉丝2收到推送:{message}");
                        };
                        Console.WriteLine("粉丝2在浏览博客中..........");
                        //处理消息
                        channel.BasicConsume(queue: "fansQueue2", autoAck: true, consumer: consumer);
                        Console.ReadLine();
                    }
                }
            }
    
    
    
    
        }
    View Code
                {
                    Thread.Sleep(2000);
    
                    Task.Run(() =>
                    {
                        FanoutExchange.Show1();
                    });
                    Task.Run(() =>
                    {
                        FanoutExchange.Show2();
                    });
                }

    运行结果

    三. Topic-Exchange模式

    1. 含义

     交换机类型设置为:ExchangeType.Topic

     这种模式可以定制key,相当于在DirectExchange的基础上增加了对key的模糊搜索,规则如下,主要是两个关键符号

     *,代表任意的一个词。例如topic.zlh.*,他能够匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....

     #,代表任意多个词。例如topic.#,他能够匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....

    2. 使用场景

     分组

    3. 代码分享

    生产者

     public class TopicExchange
        {
            public static void Show()
            {
                Console.ForegroundColor = ConsoleColor.Red;
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    { 
                        channel.ExchangeDeclare(exchange: "TopicExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null); 
                        channel.QueueDeclare(queue: "ChinaQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); 
                        channel.QueueDeclare(queue: "newsQueue", durable: true, exclusive: false, autoDelete: false,  arguments: null); 
    
                        channel.QueueBind(queue: "ChinaQueue", exchange: "TopicExchange", routingKey: "China.#", arguments: null);   //多个词匹配
                        channel.QueueBind(queue: "newsQueue", exchange: "TopicExchange", routingKey: "*.news", arguments: null);     //1个词匹配
    
                        {
                            string message = "来自中国的新闻消息1";
                            var body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish(exchange: "TopicExchange", routingKey: "China.news", basicProperties: null, body: body);  //同时匹配ChineQueue和newQueue
                            Console.WriteLine($"消息【{message}】已发送到队列");
                        }
    
                        {
                            string message = "来自中国的天气消息1";
                            var body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish(exchange: "TopicExchange", routingKey: "China.weather.news", basicProperties: null, body: body); //仅匹配ChinaQueue
                            Console.WriteLine($"消息【{message}】已发送到队列");
                        }
                        {
                            string message = "来自中国的新闻消息2";
                            var body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish(exchange: "TopicExchange", routingKey: "msg.news", basicProperties: null, body: body);   //仅匹配newsQueue
                            Console.WriteLine($"消息【{message}】已发送到队列");
                        } 
                        {
                            string message = "来自美国的天气消息2";
                            var body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish(exchange: "TopicExchange", routingKey: "usa.weather", basicProperties: null, body: body);  //谁都不匹配
                            Console.WriteLine($"消息【{message}】已发送到队列");
                        } 
                    }
                }
            }
        }
    View Code
    TopicExchange.Show();

    消费者

     public class TopicExchange
        {
            /// <summary>
            /// 读取ChinaQueue队列消息
            /// </summary>
            public static void Show1()
            {
                Console.ForegroundColor = ConsoleColor.Green;
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        //channel.ExchangeDeclare(exchange: "TopicExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null);
                        //channel.QueueDeclare(queue: "ChinaQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        //channel.QueueBind(queue: "ChinaQueue", exchange: "TopicExchange", routingKey: "China.#", arguments: null);
                        //定义消费者                                      
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"ChinaQueue队列中消费成功:{message}");
                        };
                        //处理消息
                        channel.BasicConsume(queue: "ChinaQueue", autoAck: true, consumer: consumer);
                    }
                }
            }
    
            /// <summary>
            /// 读取newsQueue队列消息
            /// </summary>
            public static void Show2()
            {
                Console.ForegroundColor = ConsoleColor.Green;
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        //定义消费者                                      
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"newsQueue队列中消费成功:{message}");
                        };
                        //处理消息
                        channel.BasicConsume(queue: "newsQueue", autoAck: true, consumer: consumer);
                    }
                }
            }
    
        }
    View Code
    {
                    Thread.Sleep(2000);
    
                    Task.Run(() =>
                    {
                        TopicExchange.Show1();
                    });
                    Task.Run(() =>
                    {
                        TopicExchange.Show2();
                    });
     }

    运行结果

     

    四. Header-Exchange模式

    1. 含义

     交换机类型设置为:ExchangeType.Headers

     headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对以及x-match参数,x-match参数是字符串类型,可以设置为any或者all。

     A. 如果设置为any,只要匹配到了headers表中的任何一对键值即可

     B. 如果设置为all,则代表需要全部匹配

    2.使用场景

     如下案例,All队列中存储了1条消息test1,Any队列中存储了2条消息,test3和test4

    3. 代码分享

    生产者

      public class HeaderExchange
        {
            public static void Show()
            {
                Console.ForegroundColor = ConsoleColor.Red;
    
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: "HeaderExchange", type: ExchangeType.Headers, durable: false, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queue: "HeaderExchangeAllqueue", durable: false, exclusive: false, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queue: "HeaderExchangeAnyqueue", durable: false, exclusive: false, autoDelete: false, arguments: null);
    
                        channel.QueueBind(queue: "HeaderExchangeAllqueue", exchange: "HeaderExchange", routingKey: string.Empty,
                                          arguments: new Dictionary<string, object> {
                                                                        { "x-match","all"},
                                                                        { "teacher","ypf"},
                                                                        { "pass","123"}});
                        Console.WriteLine("生产者准备就绪....");
                        {
                            string message = "x-match=all,teacher和pass都相同时发送的消息,test1";
                            var props = channel.CreateBasicProperties();
                            props.Headers = new Dictionary<string, object>() {{ "teacher","ypf"},
                                                                               { "pass","123"}};
                            var body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish(exchange: "HeaderExchange",
                                                 routingKey: string.Empty,
                                                 basicProperties: props,
                                                 body: body);
                            Console.WriteLine($"消息【{message}】已发送");        //存入HeaderExchangeAllqueue队列成功
                        }
                        {
                            string message = "x-match=all,teacher和pass有一个不相同时发送的消息,test2";
                            var props = channel.CreateBasicProperties();
                            props.Headers = new Dictionary<string, object>() {
                                                                               { "teacher","ypf"},
                                                                               { "pass","456"}
                                                                              };
                            var body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish(exchange: "HeaderExchange",
                                                 routingKey: string.Empty,
                                                 basicProperties: props,
                                                 body: body);
                            Console.WriteLine($"消息【{message}】已发送");     //存入HeaderExchangeAllqueue队列失败
                        }
    
    
                        Console.WriteLine("*****************************888888888*********************************");
                        {
                            channel.QueueBind(queue: "HeaderExchangeAnyqueue", exchange: "HeaderExchange", routingKey: string.Empty,
                            arguments: new Dictionary<string, object> {
                                                { "x-match","any"},
                                                { "teacher","lmr"},
                                                { "pass","123456"},});
    
                            string msg = "x-match=any,teacher和pass完全相同时发送的消息,test3";
                            var props = channel.CreateBasicProperties();
                            props.Headers = new Dictionary<string, object>() {
                                                     { "teacher","lmr"},
                                                     { "pass","123456"}
                                                };
                            var body = Encoding.UTF8.GetBytes(msg);
                            channel.BasicPublish(exchange: "HeaderExchange",
                                                 routingKey: string.Empty,
                                                 basicProperties: props,
                                                 body: body);
                            Console.WriteLine($"消息【{msg}】已发送");      //存入HeaderExchangeAnyqueue队列成功
                        }
    
                        {
                            string msg = "x-match=any,teacher和pass有一个不相同时发送的消息,test4";
                            var props = channel.CreateBasicProperties();
                            props.Headers = new Dictionary<string, object>() {
                                                     { "teacher","lmr"},
                                                     { "pass","456"}
                                                };
                            var body = Encoding.UTF8.GetBytes(msg);
                            channel.BasicPublish(exchange: "HeaderExchange",
                                                 routingKey: string.Empty,
                                                 basicProperties: props,
                                                 body: body);
                            Console.WriteLine($"消息【{msg}】已发送");    //存入HeaderExchangeAnyqueue队列成功
                        }
    
                    }
                }
                Console.ReadKey();
            }
        }
    View Code
     HeaderExchange.Show();

    消费者

    public class HeaderExchange
        {
            /// <summary>
            /// 读取HeaderExchangeAllqueue队列消息
            /// </summary>
            public static void Show1()
            {
                Console.ForegroundColor = ConsoleColor.Green;
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        //定义消费者                                      
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"HeaderExchangeAllqueue队列中消费成功:{message}");
                        };
                        //处理消息
                        channel.BasicConsume(queue: "HeaderExchangeAllqueue", autoAck: true, consumer: consumer);
                    }
                }
            }
    
            /// <summary>
            /// 读取HeaderExchangeAnyqueue队列消息
            /// </summary>
            public static void Show2()
            {
                Console.ForegroundColor = ConsoleColor.Green;
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        //定义消费者                                      
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"HeaderExchangeAnyqueue队列中消费成功:{message}");
                        };
                        //处理消息
                        channel.BasicConsume(queue: "HeaderExchangeAnyqueue", autoAck: true, consumer: consumer);
                    }
                }
            }
    
    
    
        }
    View Code
                {
                    Thread.Sleep(2000);
    
                    Task.Run(() =>
                    {
                        HeaderExchange.Show1();
                    });
                    Task.Run(() =>
                    {
                        HeaderExchange.Show2();
                    });
                }

    运行结果

    五. 剖析持久化机制

    1. 触发条件

     需要设置交换机、队列、消息均为持久化。

     

    2. 现象

    (1). 下面路径存放的是virtual,有几个virtual,就有几个文件夹

     C:UsersDELLAppDataRoamingRabbitMQdb abbit@DESKTOP-DR3FU9S-mnesiamsg_storesvhosts

    (2). 下面的msg_store_persistent文件夹存放的是持久化数据,

     关闭RabbitMQ服务,会有很多文件,打开后,又没了,说明启动服务,磁盘上的数据又被加载到了硬盘

     

    !

    • 作       者 : Yaopengfei(姚鹏飞)
    • 博客地址 : http://www.cnblogs.com/yaopengfei/
    • 声     明1 : 如有错误,欢迎讨论,请勿谩骂^_^。
    • 声     明2 : 原创博客请在转载时保留原文链接或在文章开头加上本人博客地址,否则保留追究法律责任的权利。
     
  • 相关阅读:
    如何实现序列化为json
    unity中camera摄像头控制详解
    eclipse配置c开发环境
    uml和模式01
    angular2开发01
    微信公众平台开发01
    最新无线网卡驱动安装
    交换ctrl和caps_loack的新方法
    web.xml文件详解
    设计模式中的里氏代换原则
  • 原文地址:https://www.cnblogs.com/yaopengfei/p/14673167.html
Copyright © 2011-2022 走看看