zoukankan      html  css  js  c++  java
  • RabbitMQ 发布/订阅

    channel.ExchangeDeclare("logs", ExchangeType.Fanout);
    Fanout类型它只是将接收到的所有消息广播到它知道的所有队列中
    using RabbitMQ.Client;
    using System;
    using System.Text;
    namespace EmitLog
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory()
                {
                    HostName = "localhost"
                };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);//声明   Fanout 将消息推送到他知道的所有队列
                        var message = GetMessage(args);
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);//发布
                        Console.WriteLine(" [x] Sent {0}", message);
                    }
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
            private static string GetMessage(string[] args)
            {
                return args.Length > 0 ? string.Join(" ", args) : "info: Hello World!";
            }
        }
    }

    接收者代码:

    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Text;
    namespace ReceiveLogs
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory()
                {
                    HostName = "localhost"
                };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);//声明订阅
                        var queueName = channel.QueueDeclare().QueueName;
                        channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
                        Console.WriteLine(" [*] Waiting for logs.");
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body.ToArray();
                            var message = Encoding.UTF8.GetString(body);
                            Console.WriteLine(" [x] {0}", message);
                        };
                        channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);
                        Console.WriteLine(" Press [enter] to exit.");
                        Console.ReadLine();
                    }
                }
            }
        }
    }

    开启多个接收者,发布消息时,他们都会收到同样的信息

    发布/订阅

  • 相关阅读:
    [草稿]挂载新硬盘
    [Android]开发环境配置(windows)-draft
    [草稿][C语言][内存分配]常见内存错误
    [草稿]Linux用户管理
    python的异步编程、IO多路复用、协程
    python的网络编程(socket)
    python的多进程、logging模
    python的Lock锁,线程同步
    python的并发和线程
    python的异常处理
  • 原文地址:https://www.cnblogs.com/lbonet/p/14463632.html
Copyright © 2011-2022 走看看