zoukankan      html  css  js  c++  java
  • RabbitMQ入门学习系列(五) Exchange的Direct类型

    快速阅读

    利用Exchange的Direct类型,实现对队列的过滤,消费者启动以后,输入相应的key值,攻取该key值对应的在队列中的消息 。

    从一节知道Exchange有四种类型

    Direct,Topic,headers,fanout
    

    前面我们说了fanout类型,可以把消息发送给所有的消费者,

    在用Fanout类型的时候,我们绑定的时候是没有指定Routing key的【空值】

     channel.BasicPublish(exchange: "logs",
                                             routingKey: "",
                                             basicProperties: null,
                                             body: body);
    

    这次我们说一下Direct类型

    Exchange的Direct类型将与队列中的routing key进行精确的匹配。
    

    生产者代码

    1. 创建连接和信道
    2. 声明交换器名字和指定类型为direct
    3. 发送routingkey=rk1 和rk2的消息各五次
    static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "directType1", type: "direct");
            for (var i = 0; i < 5; i++)
            {
                string message = "Hello World!this rk1 message " + i;
                var body = Encoding.UTF8.GetBytes(message);
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;
    
                channel.BasicPublish(exchange: "directType1",
                                     routingKey: "rk1",
                                     basicProperties: null,
                                     body: body);
    
                Console.WriteLine(" [x] Sent {0},id={1}", message,i);
                Thread.Sleep(1000);
            }
    
            for (var i = 0; i < 5; i++)
            {
                string message = "Hello World!this rk2 message " + i;
                var body = Encoding.UTF8.GetBytes(message);
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;
    
                channel.BasicPublish(exchange: "directType1",
                                     routingKey: "rk2",
                                     basicProperties: null,
                                     body: body);
    
                Console.WriteLine(" [x] Sent {0},id={1}", message, i);
                Thread.Sleep(1000);
            }
    
        }
    
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    } 
    

    消费者代码

    1. 输入要查看的消息类型,支持rk1 和rk2
    2. 创建连接和信道
    3. 声明交换器名字和指定类型为direct
    4. 指定队列名称,并且把routingkey的值赋值给控制台手动需要输入的rk1或者rk2
    5. 接收消息并回馈,和fanout类型一样的代码了。
    static void Main(string[] args)
    {
        bool flag = true;
        string level = "";
        while (flag)
        {
            Console.WriteLine("请选择要查看的消息类型");
            level = Console.ReadLine();
            if (level == "rk1" || level == "rk2" )
                flag = false;
            else
                Console.Write("仅支持rk1与rk2");
        }
    
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "directType1", type: "direct");
                var queueName = channel.QueueDeclare().QueueName;
                channel.QueueBind(queue: queueName, exchange: "directType1", routingKey: level);
                //以下是区别生产者的
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (sender, e) =>
                {
                    var body = e.Body;
                    var message = Encoding.UTF8.GetString(body);
                    var rk = e.RoutingKey;
                    Console.WriteLine("Received {0},routingKey:{1}", message, rk);
                    Thread.Sleep(3000);//模拟耗时任务 ,
                    Console.WriteLine("Received over");
                    channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
                };
                channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                Console.WriteLine("");
                Console.ReadLine();
            }
    
        }
    

    查看结果

    我们看到生产者分别生产了五条rk1和五条rk2的消息

    消费者1输入只查看rk1的消息,成功获得了rk1的消息

    同样的

    消费者2输入只查看rk2的消息,成功获得了rk2的消息

    要注意的是先把先消费者启动起来

  • 相关阅读:
    Python3.x和Python2.x的区别
    urllib库python2和python3具体区别
    Oracle实现自增方式:序列+触发器
    菜单
    visual studio 2013连接Oracle 11g并获取数据:(二:实现)
    Oracle连接出现TNS:no listener或者ORA-12514: TNS:listener does not currently know
    visual studio 2013连接Oracle 11g并获取数据:(一:环境搭建)
    C#编程
    Oracle 11g 安装
    Android在Eclipse上的环境配置
  • 原文地址:https://www.cnblogs.com/hsapphire/p/11142193.html
Copyright © 2011-2022 走看看