zoukankan      html  css  js  c++  java
  • 消息中间件RabbitMQ(二)

    上篇文章主要说大概逻辑,这篇就文章用代码验证细节。

    1、当交换机为Direct类型,当多个相同路由key和队列连接时,发送消息时,队列是否都会收到消息? 答案是肯定的。

    发送端:

    using (IConnection con = conFactory.CreateConnection())//创建连接对象
    {
        using (IModel channel = con.CreateModel())//创建连接会话对象
        {
            channel.QueueDeclare(queue1, false, false, false, null);
            channel.QueueDeclare(queue2, false, false, false, null);
            channel.QueueDeclare(queue3, false, false, false, null);
    
            channel.ExchangeDeclare(exchange1, ExchangeType.Direct);
            
            channel.QueueBind(queue1, exchange1, routingKey1);
            channel.QueueBind(queue2, exchange1, routingKey2);
            channel.QueueBind(queue3, exchange1, routingKey2);
    
            while (true)
            {
                Console.WriteLine("消息内容:");
                var message = Console.ReadLine();
                byte[] body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange1, routingKey2, null, body);
                channel.BasicPublish(exchange1, routingKey1, null, body);
                Console.WriteLine("成功发送消息:" + message);
            }
        }
    }

    消费端:

    using (IConnection conn = connFactory.CreateConnection())
    {
        using (IModel channel = conn.CreateModel())
        {
            var consumer = new EventingBasicConsumer(channel);
            channel.BasicConsume(queue1, false, consumer);
            channel.BasicConsume(queue2, false, consumer);
            channel.BasicConsume(queue3, false, consumer);
            consumer.Received += (model, ea) =>
            {
                byte[] message = ea.Body.ToArray();
                Console.WriteLine($"交换机:{ea.Exchange} 路由Key:{ea.RoutingKey} 接收到信息为:{Encoding.UTF8.GetString(message)}");
                channel.BasicAck(ea.DeliveryTag, false);
            };
            Console.ReadKey();
        }
    }

    结果:

     

    2、当交换器的类型是Fanout类型,与之连接多个队列,发送消息时,是否所有与之连接的队列都会收到? 答案是肯定的。

     

    发送端:

    using (IConnection con = conFactory.CreateConnection())//创建连接对象
    {
        using (IModel channel = con.CreateModel())//创建连接会话对象
        {
            channel.QueueDeclare(queue1, false, false, false, null);
            channel.QueueDeclare(queue2, false, false, false, null);
            channel.QueueDeclare(queue3, false, false, false, null);
    
            channel.ExchangeDeclare(exchange2, ExchangeType.Fanout);
            channel.QueueBind(queue1, exchange2, routingKey2);
            channel.QueueBind(queue2, exchange2, routingKey2);
            channel.QueueBind(queue3, exchange2, routingKey2);
    
            while (true)
            {
                Console.Write("消息内容:");
                var message = Console.ReadLine();
                byte[] body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange2, routingKey2, null, body);
                Console.WriteLine("成功发送消息:" + message);
            }
        }

    消费端 代码不变。

    结果:

     3、当一个队列有多个消费者,是否所有的消费者都能接收到消息? 答案是否定的。会轮询发送给监听的所有消费者

    发送端:代码不变

    消费端:添加一个消费者

    using (IConnection conn = connFactory.CreateConnection())
    {
        using (IModel channel = conn.CreateModel())
        {
            var consumer = new EventingBasicConsumer(channel);
    
            var consumer2=new EventingBasicConsumer(channel);
            channel.BasicConsume(queue1, false, consumer);
            channel.BasicConsume(queue2, false, consumer);
            channel.BasicConsume(queue3, false, consumer);
    
            channel.BasicConsume(queue1, false, consumer2);
            channel.BasicConsume(queue2, false, consumer2);
            channel.BasicConsume(queue3, false, consumer2);
    
            consumer.Received += (model, ea) =>
            {
                byte[] message = ea.Body.ToArray();
                Console.WriteLine($"消费者:consumer1  交换机:{ea.Exchange} 路由Key:{ea.RoutingKey} 接收到信息为:{Encoding.UTF8.GetString(message)}");
                channel.BasicAck(ea.DeliveryTag, false);
            };
    
            consumer2.Received += (model, ea) =>
            {
                byte[] message = ea.Body.ToArray();
                Console.WriteLine($"消费者:consumer2 交换机:{ea.Exchange} 路由Key:{ea.RoutingKey} 接收到信息为:{Encoding.UTF8.GetString(message)}");
                channel.BasicAck(ea.DeliveryTag, false);
            };
    
            Console.ReadKey();
        }
    }

    结果:

  • 相关阅读:
    用 PHP 和 MySQL 保存和输出图片
    windows XP+Apache+PHP5+MySQL的安装与配置方法
    PHP语法学习笔记
    wap开发环境搭建
    三款免费的PHP加速器:APC、eAccelerator、XCache比较
    window下apache与tomcat整合
    用PHP的ob_start();控制您的浏览器cache!
    一些经典常用的正则表达式
    windows下为apache配置多个站点
    windows下IIS与Tomcat共存的问题
  • 原文地址:https://www.cnblogs.com/MicroHeart/p/14688585.html
Copyright © 2011-2022 走看看