channel.ExchangeDeclare("logs", ExchangeType.Fanout);
Fanout类型它只是将接收到的所有消息广播到它知道的所有队列中
using RabbitMQ.Client; using System; using System.Text; namespace EmitLog { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);//声明 Fanout 将消息推送到他知道的所有队列 var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);//发布 Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } private static string GetMessage(string[] args) { return args.Length > 0 ? string.Join(" ", args) : "info: Hello World!"; } } }
接收者代码:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; namespace ReceiveLogs { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);//声明订阅 var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); Console.WriteLine(" [*] Waiting for logs."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); }; channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }
开启多个接收者,发布消息时,他们都会收到同样的信息