zoukankan      html  css  js  c++  java
  • 第二节:RabbitMq基本使用(生产消费者、优先级队列、发布订阅等)

    一. RabbitMq基本使用

    1. 条件准备

     (1).通过指令【net start rabbitmq】启动服务

     (2).准备1个生产者程序Producer, 1个消费者程序Consumer01

     (3).通过Nuget给三个程序安装 【RabbitMQ.Client 6.2.1

     (4).通过地址:http://127.0.0.1:15672 访问RabbitMq的管理系统,进行监控,账号和密码都是guest

     (5).设置程序的启动顺序,先启动Producer,然后延迟2s启动Consumer01

    2. 核心代码剖析

     (1). 创建连接工厂ConnectionFactory,指定HostName、UserName、Password(连接地址、账号、密码),也可以指定VirtualHost。

    PS:默认情况向,RabbitMq的信息都是在“/”这一虚拟机中,比如我可以指定Virtual为“/ypf”,当然需要先去可视化界面中创建/ypf,否则程序会报错

    (关于 RabbitMq、Queue、Exchange、Virtual之间的关系,详见第一节:xxxxx)

     (2).创建连接 factory.CreateConnection() 和 创建传输信道 connection.CreateModel()

     (3).创建队列: QueueDeclare

    channel.QueueDeclare(queue: "SimpleProducerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);

    A. queue: 队列名称

    B. durable:是否持久化到硬盘, true 则设置队列为持久化,持久化的队列会存磁盘,在服务器重启的时候可以保证不丢失相关信息。

    C.exclusive:设置队列是否排他。为 true 则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除

     这里需要注意 3点:

      ① 排他队列是基于连接( Connection) 可见的,同 个连接的不同信道 (Channel) 是可以同时访问同一连接创建的排他队列;

      ② "首次"是指如果1个连接己经声明了 排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同:

      ③ 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景

    D. autoDelete:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会 自动删除

    E. arguments:设置队列的其他一些参数,如 x-rnessage-ttl 、x-expires 、x-rnax-length 、x-rnax-length-bytes 等等。

     (4).创建交换机(交换机):ExchangeDeclare

     channel.ExchangeDeclare(exchange: "SimpleProducerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

    A. exchange:交换机名称。

    B. type:交换机类型,主要有(Direct、Fanout、Topic、Header)

    C. durable:是否持久化到磁盘

    D. autoDelete:自动删除 ,至少有一个队列与这个交换机绑定,之后,所有与这个交换机绑定的队列都与此解绑,才会触发删除

    E. arguments:设置交换机的一些参数。

     (5).队列和交换机绑定 :QueueBind

      channel.QueueBind(queue: "SimpleProducerQueue", exchange: "SimpleProducerExChange", routingKey: string.Empty, arguments: null);

    A. queue:需要绑定的队列名称

    B. exchange:需要绑定的交换机名称

    C. routingKey:路由key,用于指定发送到队列的规则。

    D. arguments:设置一些参数

     (6).发送消息:BasicPublish

    IBasicProperties basicProperties = channel.CreateBasicProperties();
    basicProperties.Persistent = true;  //配置消息持久化
    //basicProperties.DeliveryMode = 2;
    string message = $"ypf{i}";
    byte[] body = Encoding.UTF8.GetBytes(message);
    //发消息(不指定路由key)
    channel.BasicPublish(exchange: "SimpleProducerExChange",  routingKey: string.Empty,  basicProperties: basicProperties,body: body);

    A. queue:需要绑定的队列名称

    B. exchange:需要绑定的交换机名称

    C. routingKey:绑定路由key,用于指定发送的规则。

    D. arguments:设置一些参数

     (7).接收消息:事件模式,BasicConsume+Received

    //channel.QueueDeclare(queue: "SimpleProducerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
    //channel.ExchangeDeclare(exchange: "SimpleProducerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
    //channel.QueueBind(queue: "SimpleProducerQueue", exchange: "SimpleProducerExChange", routingKey: string.Empty, arguments: null);
    
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
       var body = ea.Body;
       var message = Encoding.UTF8.GetString(body.ToArray());
       Console.WriteLine($"消费者01 接收消息: {message}");
    };
    channel.BasicConsume(queue: "SimpleProducerQueue", autoAck: true, consumer: consumer);  //进行消费

    A. queue:消费的队列(这里只能指定一个队列哦)

    B. autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器.

    C. consumer:指定消费者。

    注意:消费者可以不用再次声明 路由、交换机、绑定,前提是生产者已经执行,该消费的队列在RabbitMq中已经存在了。

    二. 几个场景

    1. 生产者-消费者

    (1). 1个生产者-1个消费者

    模拟:生产者生产的同时,消费者进行消费。

    剖析:这里采用的是ExchangeType.Direct,但是绑定的时候不指定路由key

    生产者代码

    {
                    //设置控制台的颜色
                    Console.ForegroundColor = ConsoleColor.Red;
    
                    ConnectionFactory factory = new ConnectionFactory();
                    factory.HostName = "localhost";//RabbitMQ服务在本地运行
                    factory.UserName = "guest";//用户名
                    factory.Password = "guest";//密码 
                    //factory.VirtualHost = "/ypf";
                    using (IConnection connection = factory.CreateConnection())
                    {
                        using (IModel channel = connection.CreateModel())
                        {
                            //创建队列
                            channel.QueueDeclare(queue: "SimpleProducerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                            //创建交换机(Direct路由)
                            channel.ExchangeDeclare(exchange: "SimpleProducerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                            //队列和路由绑定
                            channel.QueueBind(queue: "SimpleProducerQueue", exchange: "SimpleProducerExChange", routingKey: string.Empty, arguments: null);
    
                            Console.WriteLine("------------------------------下面是生产者开始生产消息(2s后开始)------------------------------------------");
                            Thread.Sleep(2000);
                            for (int i = 1; i <= 100; i++)
                            {
                                IBasicProperties basicProperties = channel.CreateBasicProperties();
                                basicProperties.Persistent = true;
                                //basicProperties.DeliveryMode = 2;
                                string message = $"ypf{i}";
                                byte[] body = Encoding.UTF8.GetBytes(message);
                                //发消息(不指定路由key)
                                channel.BasicPublish(exchange: "SimpleProducerExChange",
                                                     routingKey: string.Empty,
                                                     basicProperties: basicProperties,
                                                     body: body);
                                Console.WriteLine($"消息:{message} 已发送~");
                                Thread.Sleep(500);
                            }
                        }
                    }
                }
    View Code

    消费者代码

     {
                    Thread.Sleep(2000);  //休眠两秒,等待生产者
                    Console.ForegroundColor = ConsoleColor.Green;
    
                    var factory = new ConnectionFactory();
                    factory.HostName = "localhost";//RabbitMQ服务在本地运行
                    factory.UserName = "guest";//用户名
                    factory.Password = "guest";//密码 
                    using (var connection = factory.CreateConnection())
                    {
                        using (var channel = connection.CreateModel())
                        {                     
                            try
                            {
                                //channel.QueueDeclare(queue: "SimpleProducerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                                //channel.ExchangeDeclare(exchange: "SimpleProducerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                                //channel.QueueBind(queue: "SimpleProducerQueue", exchange: "SimpleProducerExChange", routingKey: string.Empty, arguments: null);
    
                                var consumer = new EventingBasicConsumer(channel);
                                consumer.Received += (model, ea) =>
                                {
                                    var body = ea.Body;
                                    var message = Encoding.UTF8.GetString(body.ToArray());
                                    Console.WriteLine($"消费者01 接收消息: {message}");
                                };
                                channel.BasicConsume(queue: "SimpleProducerQueue", autoAck: true, consumer: consumer);
                                Console.WriteLine(" Press [enter] to exit.");
                                Console.ReadLine();
                            }
                            catch (Exception ex)
                            {
                                Console.WriteLine(ex.Message);
                            }
                        }
                    }
                }
    View Code

    运行效果

    (2). 多个生产者-多个消费者 

    模拟:1个队列,利用多线程开始多个生产者生产的同时,多个消费者进行消费。

    剖析:这里采用的是ExchangeType.Direct,但是绑定的时候不指定路由key

    生产者代码:

     /// <summary>
        /// 模拟多个生产者
        /// </summary>
        public class ManyProducer
        {
            /// <summary>
            /// 生产者
            /// </summary>
            /// <param name="producerName">生产者名称</param>
            /// <param name="num">模拟消息内容</param>
            public static void Show(string producerName, int num)
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (IConnection connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        channel.QueueDeclare(queue: "ManyProducerConsumerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        channel.ExchangeDeclare(exchange: "ManyProducerConsumerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                        channel.QueueBind(queue: "ManyProducerConsumerQueue", exchange: "ManyProducerConsumerExChange", routingKey: string.Empty, arguments: null);
                        Console.ForegroundColor = ConsoleColor.Red;
                        Console.WriteLine($"生产者{producerName}已准备就绪~~~");
                        for (int i = num; i <= num + 100; i++)
                        {
                            string message = $"生产者{producerName}:消息{i}";
                            byte[] body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish(exchange: "ManyProducerConsumerExChange",
                                                 routingKey: string.Empty,
                                                 basicProperties: null,
                                                 body: body);
                            Console.WriteLine($"消息:{message} 已发送~");
    
                            Thread.Sleep(800);
                        }
                    }
                }
            }
        }
    View Code
    {
                    //模拟多个生产者,向同一个队列里生产消息(这里使用的一定一个队列, 路由可以1个或多个)
                    Task.Run(() =>
                    {
                        //生产者ypfProducer1从10开始生产消息
                        ManyProducer.Show("ypfProducer1", 10);
                    });
                    Task.Run(() =>
                    {
                        //生产者ypfProducer2从500开始生产消息
                        ManyProducer.Show("ypfProducer2", 500);
                    });
    }

    消费者代码

     /// <summary>
        /// 模拟多个消费者
        /// </summary>
        public class ManyConsumer
        {
            /// <summary>
            /// 消费者01
            /// </summary>
            public static void Show01()
            {
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        Console.ForegroundColor = ConsoleColor.Green;
                        try
                        {
                            //channel.QueueDeclare(queue: "ManyProducerConsumerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                            //channel.ExchangeDeclare(exchange: "ManyProducerConsumerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                            //channel.QueueBind(queue: "ManyProducerConsumerQueue", exchange: "ManyProducerConsumerExChange", routingKey: string.Empty, arguments: null);
                            var consumer = new EventingBasicConsumer(channel);
                            consumer.Received += (model, ea) =>
                            {
                                var body = ea.Body;
                                var message = Encoding.UTF8.GetString(body.ToArray());
                                Console.WriteLine($"消费者001 接受消息: {message}");
                            };
                            channel.BasicConsume(queue: "ManyProducerConsumerQueue", autoAck: true, consumer: consumer);
                            Console.WriteLine(" Press [enter] to exit. 001");
                            Console.ReadLine();
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine(ex.Message);
                        }
                    }
                }
    
            }
    
    
    
            /// <summary>
            /// 消费者02
            /// </summary>
            public static void Show02()
            {
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using var connection = factory.CreateConnection();
                using var channel = connection.CreateModel();
                Console.ForegroundColor = ConsoleColor.Green;
                try
                {
                    //channel.QueueDeclare(queue: "ManyProducerConsumerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //channel.ExchangeDeclare(exchange: "ManyProducerConsumerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                    //channel.QueueBind(queue: "ManyProducerConsumerQueue", exchange: "ManyProducerConsumerExChange", routingKey: string.Empty, arguments: null);
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine($"消费者002 接受消息: {message}");
                    };
                    channel.BasicConsume(queue: "ManyProducerConsumerQueue",
                                 autoAck: true,
                                 consumer: consumer);
                    Console.WriteLine(" Press [enter] to exit. 002");
                    Console.ReadLine();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
    
            /// <summary>
            /// 消费者03
            /// </summary>
            public static void Show03()
            {
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using var connection = factory.CreateConnection();
                using var channel = connection.CreateModel();
                Console.ForegroundColor = ConsoleColor.Green;
                try
                {
                    //channel.QueueDeclare(queue: "ManyProducerConsumerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //channel.ExchangeDeclare(exchange: "ManyProducerConsumerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                    //channel.QueueBind(queue: "ManyProducerConsumerQueue", exchange: "ManyProducerConsumerExChange", routingKey: string.Empty, arguments: null);
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine($"消费者003 接受消息: {message}");
                    };
                    channel.BasicConsume(queue: "ManyProducerConsumerQueue",
                                 autoAck: true,
                                 consumer: consumer);
                    Console.WriteLine(" Press [enter] to exit 003.");
                    Console.ReadLine();
    
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
        }
    View Code
     {
                    Thread.Sleep(2000);  //休眠2秒,等待生产者
                    Task.Run(() =>
                    {
                        ManyConsumer.Show01();
                    });
                    Task.Run(() =>
                    {
                        ManyConsumer.Show02();
                    });
                    Task.Run(() =>
                    {
                        ManyConsumer.Show03();
                    });
    }

    运行效果

    2. 优先级队列

    模拟:用户购买东西,依次下单,进行排队,但是svip级别最高,可以先拿到东西,vip级别次之,普通用户最后。

    剖析:这里采用的是ExchangeType.Direct,指定路由key,通过对了的arguments参数配置支持优先级队列,然后发送消息的时候,通过Priority设置级别,数值越大级别越高。

    生产者代码:

     /// <summary>
        /// 优先级队列
        /// </summary>
        public class PriorityQueue
        {
            public static void Show()
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (IConnection connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        channel.QueueDeclare(queue: "PriorityQueue", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>() {
                                 {"x-max-priority",10 }  //指定队列要支持优先级设置;
                           });
                        channel.ExchangeDeclare(exchange: "PriorityQueueExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                        channel.QueueBind(queue: "PriorityQueue", exchange: "PriorityQueueExchange", routingKey: "PriorityKey");
                        Console.ForegroundColor = ConsoleColor.Red;
                        //下面发送消息,并设置消息的优先级
                        {
                            string[] questionList = {  "普通用户A购买东西", "vip用户B购买东西", "普通用户C购买东西", "普通用户D购买东西", "svip用户F购买东西", "vip用户G购买东西" };
                            //设置消息优先级
                            IBasicProperties props = channel.CreateBasicProperties();
                            foreach (string questionMsg in questionList)
                            {
                                //包含vip的优先级设置的高一些
                                if (questionMsg.StartsWith("svip"))
                                {
                                    props.Priority = 9;                  //svip设置级别最高
                                    channel.BasicPublish(exchange: "PriorityQueueExchange",
                                                   routingKey: "PriorityKey",
                                                   basicProperties: props,
                                                   body: Encoding.UTF8.GetBytes(questionMsg));
                                }
                                else if (questionMsg.StartsWith("vip"))
                                {
                                    props.Priority = 5;                 //vip级别次之
                                    channel.BasicPublish(exchange: "PriorityQueueExchange",
                                                   routingKey: "PriorityKey",
                                                   basicProperties: props,
                                                   body: Encoding.UTF8.GetBytes(questionMsg));
                                }
                                else
                                {
                                    props.Priority = 1;               //普通用户最后   
                                    channel.BasicPublish(exchange: "PriorityQueueExchange",
                                                   routingKey: "PriorityKey",
                                                   basicProperties: props,
                                                   body: Encoding.UTF8.GetBytes(questionMsg));
                                }
                                Console.WriteLine($"{questionMsg} 已发送~~");
                            }
                        }
                        Console.Read();
                    }
                }
            }
        }
    View Code
    {
          PriorityQueue.Show();
    }

    消费者代码:

     /// <summary>
        /// 优先级队列,消费者
        /// </summary>
        public class PriorityQueue
        {
            public static void Show()
            {
                Console.ForegroundColor = ConsoleColor.Green;
    
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        //channel.QueueDeclare(queue: "PriorityQueue", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>() {
                        //         {"x-max-priority",10 }  //指定队列要支持优先级设置;
                        //   });
                        //channel.ExchangeDeclare(exchange: "PriorityQueueExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                        //channel.QueueBind(queue: "PriorityQueue", exchange: "PriorityQueueExchange", routingKey: "PriorityKey");
    
                       
                        //定义消费者                                      
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            string msg = Encoding.UTF8.GetString(ea.Body.ToArray());
                            Console.WriteLine(msg);
                            Thread.Sleep(300);
                        };
                        Console.WriteLine("消费者准备就绪....");
                        //处理消息
                        channel.BasicConsume(queue: "PriorityQueue", autoAck: true, consumer: consumer);
                        Console.ReadKey();
    
                    }
                }
            }
        }
    View Code
    {
                    Thread.Sleep(3000);
                    PriorityQueue.Show();
    }

    运行结果:svip虽然是第5个下单的,但是消费的时候是第1个消费的,然后vip次之,普通用户最后。

    3. 发布订阅模式 

     也可以叫做观察者模式,实质上就是一个交换机绑定多个队列,每个队列就是一个订阅者,发布者每发布一条消息,同时向多个订阅者的队列中发送消息,然后每个订阅者分别去自己的队列中消费即可。

    剖析:这里采用Fanout交换机的模式处理这个场景最为恰当(下一节会详细介绍)

    发布者代码

    /// <summary>
        /// 发布订阅模式-发布者
        /// </summary>
        public class PublishSubscribeConsumer
        {
            public static void Show()
            {
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        channel.QueueDeclare(queue: "PublishSubscrib01", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queue: "PublishSubscrib02", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        channel.ExchangeDeclare(exchange: "PublishSubscribExChange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
                        channel.QueueBind(queue: "PublishSubscrib01", exchange: "PublishSubscribExChange", routingKey: string.Empty, arguments: null);
                        channel.QueueBind(queue: "PublishSubscrib02", exchange: "PublishSubscribExChange", routingKey: string.Empty, arguments: null);
                        Console.ForegroundColor = ConsoleColor.Red;
                        Console.WriteLine("开始发布消息~~~~"); 
                        for (int i = 1; i <= 20; i++)
                        { 
                            string message = $"发布第{i}条消息...";
                            byte[] body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish(exchange: "PublishSubscribExChange",
                                            routingKey: string.Empty,
                                            basicProperties: null,
                                            body: body); 
                            Console.WriteLine(message);
                            Thread.Sleep(200);
                        }
                    }
                }
            }
        }
    View Code
    {
         PublishSubscribeConsumer.Show();
    }

    订阅者代码

     /// <summary>
        /// 发布订阅-订阅者
        /// </summary>
        public class PublishSubscribeConsumer
        {
            /// <summary>
            /// 订阅者1
            /// </summary>
            public static void Show1()
            {
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    { 
                        Console.ForegroundColor = ConsoleColor.Green;
                        //channel.QueueDeclare(queue: "PublishSubscrib01", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        //channel.ExchangeDeclare(exchange: "PublishSubscribExChange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
                        //channel.QueueBind(queue: "PublishSubscrib01", exchange: "PublishSubscribExChange", routingKey: string.Empty, arguments: null);                     
                        Console.WriteLine("订阅者01 已经准备就绪~~");
                        try
                        {
                            var consumer = new EventingBasicConsumer(channel); 
                            consumer.Received += (model, ea) =>
                            { 
                                    var body = ea.Body;
                                    var message = Encoding.UTF8.GetString(body.ToArray());
                                    Console.WriteLine($"订阅者01收到消息:{message} ~");  
                            };
                            channel.BasicConsume(queue: "PublishSubscrib01", autoAck: true, consumer: consumer); 
                            Console.ReadLine();
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine(ex.Message);
                        } 
                    }
                }
            }
    
            /// <summary>
            /// 订阅者2
            /// </summary>
            public static void Show2()
            {
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        Console.ForegroundColor = ConsoleColor.Green;
                        //channel.QueueDeclare(queue: "PublishSubscrib02", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        //channel.ExchangeDeclare(exchange: "PublishSubscribExChange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
                        //channel.QueueBind(queue: "PublishSubscrib02", exchange: "PublishSubscribExChange", routingKey: string.Empty, arguments: null);
    
                        Console.WriteLine("订阅者02 已经准备就绪~~");
                        try
                        {
                            var consumer = new EventingBasicConsumer(channel);
                            consumer.Received += (model, ea) =>
                            {
                                var body = ea.Body;
                                var message = Encoding.UTF8.GetString(body.ToArray());
                                Console.WriteLine($"订阅者02收到消息:{message} ~");
                            };
                            channel.BasicConsume(queue: "PublishSubscrib02", autoAck: true, consumer: consumer);
                            Console.ReadLine();
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine(ex.Message);
                        }
                    }
                }
            }
    
        }
    View Code
     {
                    Thread.Sleep(2000);
    
                    Task.Run(() =>
                    {
                        PublishSubscribeConsumer.Show1();
                    });
                    Task.Run(() =>
                    {
                        PublishSubscribeConsumer.Show2();
                    });
    }

    运行结果 (订阅者1和订阅者2分别拿到自己的消息)

     

     

    !

    • 作       者 : Yaopengfei(姚鹏飞)
    • 博客地址 : http://www.cnblogs.com/yaopengfei/
    • 声     明1 : 如有错误,欢迎讨论,请勿谩骂^_^。
    • 声     明2 : 原创博客请在转载时保留原文链接或在文章开头加上本人博客地址,否则保留追究法律责任的权利。
     
  • 相关阅读:
    DVI与DVI-D的区别
    easyui.combotree.search.js
    显示实时日期时间(html+js)
    Jquery 内容简介
    EasyUI 格式化DataGrid列
    EasyUI DataGrid 添加排序
    EasyUI DataGrid 复选框
    EasyUI 自定义DataGrid分页
    EasyUI DataGrid能编辑
    EasyUI 我的第一个窗口
  • 原文地址:https://www.cnblogs.com/yaopengfei/p/14664399.html
Copyright © 2011-2022 走看看