zoukankan      html  css  js  c++  java
  • RabbitMQ 发布/订阅

    channel.ExchangeDeclare("logs", ExchangeType.Fanout);
    Fanout类型它只是将接收到的所有消息广播到它知道的所有队列中
    using RabbitMQ.Client;
    using System;
    using System.Text;
    namespace EmitLog
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory()
                {
                    HostName = "localhost"
                };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);//声明   Fanout 将消息推送到他知道的所有队列
                        var message = GetMessage(args);
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);//发布
                        Console.WriteLine(" [x] Sent {0}", message);
                    }
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
            private static string GetMessage(string[] args)
            {
                return args.Length > 0 ? string.Join(" ", args) : "info: Hello World!";
            }
        }
    }

    接收者代码:

    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Text;
    namespace ReceiveLogs
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory()
                {
                    HostName = "localhost"
                };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);//声明订阅
                        var queueName = channel.QueueDeclare().QueueName;
                        channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
                        Console.WriteLine(" [*] Waiting for logs.");
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body.ToArray();
                            var message = Encoding.UTF8.GetString(body);
                            Console.WriteLine(" [x] {0}", message);
                        };
                        channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);
                        Console.WriteLine(" Press [enter] to exit.");
                        Console.ReadLine();
                    }
                }
            }
        }
    }

    开启多个接收者,发布消息时,他们都会收到同样的信息

    发布/订阅

  • 相关阅读:
    centos7下mysql双主+keepalived
    Nginx 性能优化有这篇就够了!
    mysql对自增主键ID进行重新排序
    nginx 配置文件 2019-12-20
    zabbix服务端接收的数据类型,便于编写脚本向服务端提交数据
    zabbix自动注册,实现自动添加机器,减少人工干预
    zabbix企业微信告警配置教程
    websocket 连接测试端口服务是否正常代码
    mongodb Sort排序能够支持的最大内存限制为32M Plan executor error during find: FAILURE
    rabbitmq 的安装配置使用
  • 原文地址:https://www.cnblogs.com/lbonet/p/14463632.html
Copyright © 2011-2022 走看看