zoukankan      html  css  js  c++  java
  • RabbitMQ与.net core(三) fanout类型Exchange 与 消息的过期时间 与 队列的存活时间

    上一篇我们讲了关于direct类型的Exchange,这一片我们来了解一下fanout类型的Exchange。

    1.Exchange的fanout类型

    fanout类型的Exchange的特点是会把消息发送给与之绑定的所有Queue中,我们来测试一下。代码如下

    using RabbitMQ.Client;
    using System;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace RabbitMQConsole
    {
        class Program
        {
            static void Main(string[] args)
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "39.**.**.**";
                factory.Port = 5672;
                factory.VirtualHost = "/";
                factory.UserName = "root";
                factory.Password = "root";
    
                var exchange = "change3";
                var route = "route2";
                var queue3 = "queue3";
                var queue4 = "queue4";
                var queue5 = "queue5";
    
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false);
                        channel.QueueDeclare(queue3, durable: true, exclusive: false, autoDelete: false);
                        channel.QueueBind(queue3, exchange, queue3);
    
                        channel.QueueDeclare(queue4, durable: true, exclusive: false, autoDelete: false);
                        channel.QueueBind(queue4, exchange, queue4);
    
                        channel.QueueDeclare(queue5, durable: true, exclusive: false, autoDelete: false);
                        channel.QueueBind(queue5, exchange, queue5);
    
                      
                        var props = channel.CreateBasicProperties();
                        props.Persistent = true;
                        channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));
    
                    }
                }
            }
        }
    }

    运行代码,去可视化工具中查看一下

    消费其中的一个

    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Text;
    using System.Threading;
    
    namespace RabbitMQClient
    {
        class Program
        {
            private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
            {
                HostName = "39.**.**.**",
                Port = 5672,
                UserName = "root",
                Password = "root",
                VirtualHost = "/"
            };
            static void Main(string[] args)
            {
                var exchange = "change3";
                var route = "route2";
                var queue = "queue3";
    
    
                using (IConnection conn = rabbitMqFactory.CreateConnection())
                using (IModel channel = conn.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, "fanout", durable: true, autoDelete: false);
                    channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                    channel.QueueBind(queue, exchange, route);
    
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 50, global: false);
                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        Byte[] body = ea.Body;
                        String message = Encoding.UTF8.GetString(body);
                        Console.WriteLine( message+Thread.CurrentThread.ManagedThreadId);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
    
                    channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
                    Console.ReadLine();
                }
            }
        }
    }

    结果如下

    大家可以依次消费其他两个Queue,这里就不演示了

    2.消息的过期时间

    我们在发送一些消息的时候,有时希望给消息设置一下过期时间,我们可以通过两种方式来设置

    2.1设置队列的过期时间

    using RabbitMQ.Client;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace RabbitMQConsole
    {
        class Program
        {
            static void Main(string[] args)
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "39.**.**.**";
                factory.Port = 5672;
                factory.VirtualHost = "/";
                factory.UserName = "root";
                factory.Password = "root";
    
                var exchange = "change4";
                var route = "route2";
                var queue7 = "queue7";
    
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false);
                //队列过期时间,单位毫秒
                        channel.QueueDeclare(queue7, durable: true, exclusive: false, autoDelete: false,arguments:new Dictionary<string, object> { { "x-message-ttl", 8000 } });
                        channel.QueueBind(queue7, exchange, queue7);
    
                        var props = channel.CreateBasicProperties();
                        props.Persistent = true;
                        channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));
    
                    }
                }
            }
        }
    }

    这样过8秒去Queue就看不到该消息了

    2.2设置message的过期时间

    using RabbitMQ.Client;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace RabbitMQConsole
    {
        class Program
        {
            static void Main(string[] args)
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "39.**.**.**";
                factory.Port = 5672;
                factory.VirtualHost = "/";
                factory.UserName = "root";
                factory.Password = "root";
    
                var exchange = "change4";
                var route = "route2";
                var queue7 = "queue7";
    
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false);
                        channel.QueueDeclare(queue7, durable: true, exclusive: false, autoDelete: false,arguments:new Dictionary<string, object> { { "x-message-ttl", 8000 } });
                        channel.QueueBind(queue7, exchange, queue7);
    
                        var props = channel.CreateBasicProperties();
                //message过期时间,单位毫秒
                        props.Expiration = "30000";
                        props.Persistent = true;
                        channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));
    
                    }
                }
            }
        }
    }

    我们发现还是8秒就过期了,说明如果同时设置了队列与消息的过期时间,则按照队列的时间过期。我们把队列的过期时间去掉重新试一下。

    using RabbitMQ.Client;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace RabbitMQConsole
    {
        class Program
        {
            static void Main(string[] args)
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "39.**.**.**";
                factory.Port = 5672;
                factory.VirtualHost = "/";
                factory.UserName = "root";
                factory.Password = "root";
    
                var exchange = "change4";
                var route = "route2";
                var queue7 = "queue7";
    
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false);
                        channel.QueueDeclare(queue7, durable: true, exclusive: false, autoDelete: false);
                        channel.QueueBind(queue7, exchange, queue7);
    
                        var props = channel.CreateBasicProperties();
                        props.Expiration = "30000";
                        props.Persistent = true;
                        channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));
    
                    }
                }
            }
        }
    }

    3.队列生存时间

    我们还可以设置一个队列的生存时间

    using RabbitMQ.Client;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace RabbitMQConsole
    {
        class Program
        {
            static void Main(string[] args)
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "39.**.**.**";
                factory.Port = 5672;
                factory.VirtualHost = "/";
                factory.UserName = "root";
                factory.Password = "root";
    
                var exchange = "change4";
                var route = "route2";
                var queue8 = "queue8";
    
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false);
                        channel.QueueDeclare(queue8, durable: true, exclusive: false, autoDelete: false,arguments: new Dictionary<string, object> {
                            { "x-expires",10000} //设置当前队列的过期时间为10000毫秒
                        });
                        channel.QueueBind(queue8, exchange, queue8);
    
                        var props = channel.CreateBasicProperties();
                        props.Persistent = true;
                        channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));
    
                    }
                }
            }
        }
    }

    这样10秒后队列就消失了

  • 相关阅读:
    Java中的Socket用法
    ASP.NET MVC5 的请求管道和运行生命周期
    cookie、session和application
    手把手带你开发一款 IIS 模块后门
    HttpModule介绍
    ASP.NET MVC入门到精通——MVC请求管道
    HttpModule的认识
    WCF技术剖析之二:再谈IIS与ASP.NET管道
    ASP.NET Process Model之二:ASP.NET Http Runtime Pipeline
    ASP.NET Process Model之二:ASP.NET Http Runtime Pipeline[上篇]
  • 原文地址:https://www.cnblogs.com/chenyishi/p/10239554.html
Copyright © 2011-2022 走看看