zoukankan      html  css  js  c++  java
  • RabbitMQ与.net core(二)Producer与Exchange

    Producer:消息的生产者,也就是创建消息的对象

    Exchange:消息的接受者,也就是用来接收消息的对象,Exchange接收到消息后将消息按照规则发送到与他绑定的Queue中。下面我们来定义一个Producer与Exchange。

    1.新建.netcore console项目,并引入RabbitMQ.Client的Nuget包

    2.创建Exchange

    using RabbitMQ.Client;
    
    namespace RabbitMQConsole
    {
        class Program
        {
            static void Main(string[] args)
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "39.**.**.**";
                factory.Port = 5672;
                factory.VirtualHost = "/";
                factory.UserName = "root";
                factory.Password = "root";
    
                var exchange = "change2";
                var route = "route2";
                var queue = "queue2";
    
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange, type:"direct", durable: true, autoDelete: false);   //创建Exchange
                        
                    }
                }
            }
        }
    }

    可以看到Echange的参数有:

    type:可选项为,fanout,direct,topic,headers。区别如下:

        fanout:发送到所有与当前Exchange绑定的Queue中

        direct:发送到与消息的routeKey相同的Rueue中

        topic:fanout的模糊版本

        headers:发送到与消息的header属性相同的Queue中

    durable:持久化

    autoDelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。

     运行程序,可以在可视化界面看到change2

    接下来我们可以创建与change2绑定的queue

    3.创建Queue

                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false);
                        channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);  #创建queue2
                        channel.QueueBind(queue, exchange, route);  #将queue2绑定到exchange2
                    }

    可以看到Echange的参数有:

    durable:持久化

    exclusive:如果为true,则queue只在channel存在时存在,channel关闭则queue消失

    autoDelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。

    去可视化界面看Queue

    4.发送消息

                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false);
                        channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                        channel.QueueBind(queue, exchange, route);
                        var props = channel.CreateBasicProperties();
                        props.Persistent = true; #持久化
                        channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));
                    }

    5.消费消息

    using RabbitMQ.Client;
    using System;
    using System.Text;
    
    namespace RabbitMQClient
    {
        class Program
        {
            private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
            {
                HostName = "39.**.**.**",
                Port = 5672,
                UserName = "root",
                Password = "root",
                VirtualHost = "/"
            };
            static void Main(string[] args)
            {
                var exchange = "change2";
                var route = "route2";
                var queue = "queue2";
    
    
                using (IConnection conn = rabbitMqFactory.CreateConnection())
                using (IModel channel = conn.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false);
                    channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                    channel.QueueBind(queue, exchange, route);
                    while (true)
                    {
                        var message = channel.BasicGet(queue, true);  #第二个参数说明自动释放消息,如为false需手动释放消息
                        if(message!=null)
                        {
                            var msgBody = Encoding.UTF8.GetString(message.Body);
                            Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                        }
                        System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                    }
                }
            }
        }
    }

    运行查看结果

    查看可视化界面

    6.手动释放消息

                    while (true)
                    {
                        var message = channel.BasicGet(queue, false);#设置为手动释放
                        if(message!=null)
                        {
                            var msgBody = Encoding.UTF8.GetString(message.Body);
                            Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                        }
                        channel.BasicAck(message.DeliveryTag, false); #手动释放
                        System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                    }

    我们再发一条消息,然后开始消费,加个断点调试一下

    查看一下Queue中消息状态

    然后直接取消调试,不让程序走到释放的那一步,再查看一下消息状态

    这么说来只要不走到 channel.BasicAck(message.DeliveryTag, false);这一行,消息就不会被释放掉,我们让程序直接走到这一行代码,查看一下消息的状态

    如图已经被释放了

    7.让失败的消息回到队列中

                    while (true)
                    {
                        var message = channel.BasicGet(queue, false);
                        if(message!=null)
                        {
                            var msgBody = Encoding.UTF8.GetString(message.Body);
                            Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                            Console.WriteLine(message.DeliveryTag);    #当前消息被处理的次序数
                            if (1==1)
                                channel.BasicReject(message.DeliveryTag, true);
                        }
                        
                        System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                    }

    重新发送4条消息

    开始消费

    我们可以看到消息一直没有没消费,因为消息被处理之后又放到了队尾

    8.监听消息

     using (IConnection conn = rabbitMqFactory.CreateConnection())
                using (IModel channel = conn.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false);
                    channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                    channel.QueueBind(queue, exchange, route);
    
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);  #一次接受10条消息,否则rabbit会把所有的消息一次性推到client,会增大client的负荷
                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        Byte[] body = ea.Body;
                        String message = Encoding.UTF8.GetString(body);
                        Console.WriteLine( message+Thread.CurrentThread.ManagedThreadId);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
    
                    channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
                    Console.ReadLine();
                }
  • 相关阅读:
    NSUserDefaults 简介,使用 NSUserDefaults 存储自定义对象
    OC,查找字符串2在字符串1中出现的次数
    iOS开发知识碎片----01
    iOS中pch文件的应用
    UIKit性能调优实战讲解
    尽量将View设置为Opaque,iOS开发技巧
    Xcode开发技巧之code snippets(代码片段)
    【工具】openwrt安装记录
    【对象模型】C++模版的编译链接过程——编译器真的会检查所有tocken层面的错误么?
    【转】利用TCMalloc优化Nginx的性能
  • 原文地址:https://www.cnblogs.com/chenyishi/p/10233629.html
Copyright © 2011-2022 走看看