zoukankan      html  css  js  c++  java
  • .NET 云原生架构师训练营(模块二 基础巩固 RabbitMQ 工作队列和交换机)--学习笔记

    2.6.4 RabbitMQ -- 工作队列和交换机

    • WorkQueue
    • Publish/Subscribe
    • Routing
    • EmitLog

    WorkQueue

    WorkQueue:https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html

    • 一个消息生产者,多个消息消费者
    • exchange 交换机自动恢复
    • 对消息进行持久化
    • 手动确认消息

    对消息进行持久化

    var properties = channel.CreateBasicProperties();
    properties.Persistent = true;
    
    channel.BasicPublish(exchange: "",
                         routingKey: "task_queue",
                         basicProperties: properties,
                         body: body);
    

    手动确认消息

    autoAck: false

    channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
    

    手动调用 BasicAck

    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    

    修改接收端为手动确认消息

    autoAck: false

    channel.BasicConsume(queue: "hello",
        autoAck: false,
        consumer: consumer);
    

    BasicAck

    consumer.Received += (model, ea) =>
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
    
        Thread.Sleep(2000);// 演示多个接收端
    
        channel.BasicAck(ea.DeliveryTag, false);
        Console.WriteLine(" [x] Received {0}", message);
    };
    

    启动多个接收端

    Publish/Subscribe

    Publish/Subscribe:https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html

    Fanout 交换机,每个队列都会收到

    channel.ExchangeDeclare("logs", ExchangeType.Fanout);
    

    Routing

    Routing:https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html

    Bindings

    channel.QueueBind(queue: queueName,
                      exchange: "logs",
                      routingKey: "");
    

    Direct exchange

    channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
    

    EmitLog

    新建控制台项目 EmitLogDirect,ReceiveLogsDirect

    发送端

    namespace EmitLogDirect
    {
        class EmitLogDirect
        {
            public static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                using(var connection = factory.CreateConnection())
                using(var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);// 声明交换机
    
                    var severity = (args.Length > 0) ? args[0] : "info";
                    var message = (args.Length > 1)
                        ? string.Join(" ", args.Skip( 1 ).ToArray())
                        : "Hello World!";
                    var body = Encoding.UTF8.GetBytes(message);
    
                    channel.BasicPublish(exchange: "direct_logs",
                        routingKey: severity,// 路由 Key 自动带上严重级别
                        basicProperties: null,
                        body: body);
    
                    Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
                }
    
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
    

    error 级别单独发送到一个队列

    接收端

    namespace ReceiveLogsDirect
    {
        class ReceiveLogsDirect
        {
            public static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);// 声明交换机
    
                    var queueName = channel.QueueDeclare().QueueName;
    
                    if (args.Length < 1)
                    {
                        Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
                            Environment.GetCommandLineArgs()[0]);
                        Console.WriteLine(" Press [enter] to exit.");
                        Console.ReadLine();
                        Environment.ExitCode = 1;
                        return;
                    }
    
                    foreach (var severity in args)
                    {
                        channel.QueueBind(queue: queueName,
                            exchange: "direct_logs",
                            routingKey: severity);// 路由 Key 自动带上严重级别
                    }
    
                    Console.WriteLine(" [*] Waiting for messages.");
    
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body);
                        var routingKey = ea.RoutingKey;
                        Console.WriteLine(" [x] Received '{0}':'{1}'",
                            routingKey, message);
                    };
                    channel.BasicConsume(queue: queueName,
                        autoAck: true,
                        consumer: consumer);
    
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
        }
    }
    

    替换发送端,接收端的 localhost 为服务器地址

    接收端控制台启动

    dotnet run info waring error
    

    发送端控制台启动

    dotnet run info
    
    dotnet run error
    
    dotnet run waring test
    

    接收端输出

     [x] Received 'info':'Hello World!'
     [x] Received 'error':'Hello World!'
     [x] Received 'waring':'test'
    

    GitHub源码链接:

    https://github.com/MINGSON666/Personal-Learning-Library/tree/main/ArchitectTrainingCamp

    知识共享许可协议

    本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。

    欢迎转载、使用、重新发布,但务必保留文章署名 郑子铭 (包含链接: http://www.cnblogs.com/MingsonZheng/ ),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。

    如有任何疑问,请与我联系 (MingsonZheng@outlook.com) 。

  • 相关阅读:
    git Permanently added the RSA host key for IP address '13.250.177.223' to the list of known hosts.
    git error: failed to push some refs to 'git@github.com:xxx/xxx.git'
    git LF will be replaced by CRLF in hellogit.txt
    为什么要买保险,并且如何配置保险,以及家庭保险的配置
    Molile App(HTTP/HTML)—Analyze Traffic
    Molile App(HTTP/HTML)—Record and Analyze Traffic
    清空KindEditor富文本编辑器里面的内容方法
    图片上传和显示——上传图片——上传文件)==ZJ
    页面静态化实现——根据模板动态创建静态页
    通过Ajax实现增删改查
  • 原文地址:https://www.cnblogs.com/MingsonZheng/p/14260144.html
Copyright © 2011-2022 走看看