zoukankan      html  css  js  c++  java
  • rabbitmq 工作模式

    一、工作队列模式

     

     C1 C2轮流接收消息

    启动两个接受者

    channel.QueueDeclare(queue:"que",durable:false,exclusive:false,autoDelete:false,arguments:null);

    交换机设置为空字符串

    channel.BasicPublish(exchange: "",routingKey:"que",basicProperties:null,body:msg);

    二、发布订阅模式

     

    P发给一个交换机,这个交换机绑定两个队列,C1 C2各监听一个队列。 

    发布者

    static void Main(string[] args)
            {
                IConnectionFactory connectionFactory = new ConnectionFactory
                {
                    HostName = "192.168.35.122",
                    Port = 5672,
                    UserName="meng",
                    Password= "meng"
                };
                connectionFactory.VirtualHost = "/";
                //
                using (IConnection con = connectionFactory.CreateConnection())
                {
                    using (IModel channel = con.CreateModel())
                    {
                        //queue 表示队列名称
                        //durable 是否持久化,mq重启后队列还在
                        //exclusive 是否独占连接,关闭后会自动删除
                        //autoDelete 自动删除
                        //arguments 
                        channel.QueueDeclare(queue:"que1",durable: true, exclusive:false,autoDelete:false,arguments:null);
                        channel.QueueDeclare(queue: "que2", durable: true, exclusive: false, autoDelete: false, arguments: null);
    
                        //声明一个交换机
                        //exchang 交换机名称
                        //type 交换机类型,Fanout-对应发布订阅模式
                        channel.ExchangeDeclare("ex", ExchangeType.Fanout);
    
                        //交换机和队列绑定
                        //queue 要绑定的队列名称
                        //exchange 要绑定的交换机
                        //路由key, 发布订阅模式里用不到
                        channel.QueueBind("que1", "ex","");
                        channel.QueueBind("que2", "ex", "");
                        while (true)
                        {
                            Console.WriteLine("输入消息");
                            string str = Console.ReadLine();
                            byte[] msg = Encoding.UTF8.GetBytes(str);
                            //exchange mq的交换机
                            //routingKey 路由key, 交换机根据消息转发到指定队列。如果使用默认交换机,routingKey要使用队列名称
                            //basicProperties
                            //body 二进制数组
    
                            //发布订阅模式发送给交换机,路由key设置为空
                            channel.BasicPublish(exchange: "ex",routingKey:"",basicProperties:null,body:msg);
                            Console.WriteLine("发送" + str);
                        }
                    }
                }
    
            }

    消费者

    创建两个消费者

    static void Main(string[] args)
            {
                IConnectionFactory connectionFactory = new ConnectionFactory
                {
                    HostName = "192.168.35.122",
                    Port = 5672,
                    UserName = "meng",
                    Password = "meng"
                };
    
                using (IConnection con = connectionFactory.CreateConnection())
                {
                    using (IModel channel = con.CreateModel())
                    {
    
                        //声明一个队列
                        channel.QueueDeclare("que1",exclusive:false ,durable: true, autoDelete: false, arguments: null);
                        //声明一个队列
                        channel.QueueDeclare("que2", exclusive: false, durable: true, autoDelete: false, arguments: null);
    
                        //声明交换机
                        channel.ExchangeDeclare("ex", ExchangeType.Fanout);
                        //交换机与队列绑定
                        channel.QueueBind("que1", "ex", "");
                        channel.QueueBind("que2", "ex", "");
    
                        //创建消费者对象
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                           string message=Encoding.UTF8.GetString(ea.Body.Span);
                            Console.WriteLine("接收到信息为:" + message);
                        };
                        //消费者开启监听
                        channel.BasicConsume(queue: "que1", autoAck: true, consumer: consumer);
                        channel.BasicConsume(queue: "que2", autoAck: true, consumer: consumer);
                        Console.ReadKey();
                    }
                }
            }

    三、路由模式

    根据路由发送给注册了指定路由的队列 

    生产者 

                    using (IModel channel = con.CreateModel())
                    {
    
                        channel.QueueDeclare(queue:"que1",durable: true, exclusive:false,autoDelete:false,arguments:null);
                        channel.QueueDeclare(queue: "que2", durable: true, exclusive: false, autoDelete: false, arguments: null);
    
    
                        //Direct-对应路由模式
                        channel.ExchangeDeclare("ex", ExchangeType.Direct);
    
                        //设置路由
                        channel.QueueBind("que1", "ex","ok");
                        channel.QueueBind("que2", "ex", "error");
                        while (true)
                        {
                            Console.WriteLine("输入消息");
                            string str = Console.ReadLine();
                            byte[] msg = Encoding.UTF8.GetBytes(str);
    
                            //指定routingkey
                            channel.BasicPublish(exchange: "ex",routingKey:"error",basicProperties:null,body:msg);
                            Console.WriteLine("发送" + str);
                        }
                    }

    消费者,为了区别,开启了两个监听

    using (IModel channel = con.CreateModel())
                    {
    
                        //声明一个队列
                        channel.QueueDeclare("que1",exclusive:false ,durable: true, autoDelete: false, arguments: null);
                        //声明一个队列
                        channel.QueueDeclare("que2", exclusive: false, durable: true, autoDelete: false, arguments: null);
    
                        //声明交换机
                        channel.ExchangeDeclare("ex", ExchangeType.Direct);
                        //交换机与队列绑定
                        channel.QueueBind("que1", "ex", "ok");
                        channel.QueueBind("que2", "ex", "error");
    
                        //创建消费者对象
                        var consumer1 = new EventingBasicConsumer(channel);
                        var consumer2 = new EventingBasicConsumer(channel);
                        consumer1.Received += (model, ea) =>
                        {
                           string message=Encoding.UTF8.GetString(ea.Body.Span);
                            Console.WriteLine("que1接收到信息为:" + message);
                        };
                        consumer2.Received += (model, ea) =>
                        {
                            string message = Encoding.UTF8.GetString(ea.Body.Span);
                            Console.WriteLine("que2接收到信息为:" + message);
                        };
                        //消费者开启监听
                        channel.BasicConsume(queue: "que1", autoAck: true, consumer: consumer1);
                        channel.BasicConsume(queue: "que2", autoAck: true, consumer: consumer2);
                        Console.ReadKey();
                    }

    四、通配符模式

     

    匹配模式和路由模式的区别在于,路由模式用相等判断,匹配模式根据通配符判断。 

    * 代表任意一个单词

    例如

    生产者路由设置成  *.ok.*  ,表示这个路由有三个单词组成,中间一个单词必须是ok, hi.ok.mm

    # 代表任意多个单词

    例如

    生产者路由设置成  abc.# ,表示d可以是由abc开头的任意多个单词,abc.uu.mm.w

    生产者代码

                    using (IModel channel = con.CreateModel())
                    {
    
                        channel.QueueDeclare(queue:"que1",durable: true, exclusive:false,autoDelete:false,arguments:null);
                        channel.QueueDeclare(queue: "que2", durable: true, exclusive: false, autoDelete: false, arguments: null);
    
    
                        //Topic-匹配模式
                        channel.ExchangeDeclare("ex", ExchangeType.Topic);
    
                        //设置路由
                        channel.QueueBind("que1", "ex","*.ok.*");
                        channel.QueueBind("que2", "ex", "abc.#");
                        while (true)
                        {
                            Console.WriteLine("输入消息");
                            string str = Console.ReadLine();
                            byte[] msg = Encoding.UTF8.GetBytes(str);
    
                            //发给了que1
                            //channel.BasicPublish(exchange: "ex",routingKey:"hi.ok.mm",basicProperties:null,body:msg);
                            //发给了que2
                            //channel.BasicPublish(exchange: "ex", routingKey: "abc.kk", basicProperties: null, body: msg);
                            //同时发给了两个人
                            channel.BasicPublish(exchange: "ex", routingKey: "abc.ok.mm", basicProperties: null, body: msg);
                            Console.WriteLine("发送" + str);
                        }
                    }

    消费者代码

                    using (IModel channel = con.CreateModel())
                    {
                        channel.QueueDeclare("que1",exclusive:false ,durable: true, autoDelete: false, arguments: null);
                        channel.QueueDeclare("que2", exclusive: false, durable: true, autoDelete: false, arguments: null);
    
                        //声明交换机
                        channel.ExchangeDeclare("ex", ExchangeType.Topic);
                        //交换机与队列绑定
                        channel.QueueBind("que1", "ex", "*.ok.*");
                        channel.QueueBind("que2", "ex", "abc.#");
    
                        var consumer1 = new EventingBasicConsumer(channel);
                        var consumer2 = new EventingBasicConsumer(channel);
                        consumer1.Received += (model, ea) =>
                        {
                           string message=Encoding.UTF8.GetString(ea.Body.Span);
                            Console.WriteLine("que1接收到信息为:" + message);
                        };
                        consumer2.Received += (model, ea) =>
                        {
                            string message = Encoding.UTF8.GetString(ea.Body.Span);
                            Console.WriteLine("que2接收到信息为:" + message);
                        };
                        channel.BasicConsume(queue: "que1", autoAck: true, consumer: consumer1);
                        channel.BasicConsume(queue: "que2", autoAck: true, consumer: consumer2);
                        Console.ReadKey();
                    }

    五、远程调用模式

     

  • 相关阅读:
    iframe设置背景透明
    苹果新版QuickTime X启用新图标
    css命名规则
    视觉设计前瞻实用性研究(PNVD) 第二期
    Tab(选项卡)的产品设计原则及应用 [1]
    WiFi热点认证
    自画表格,微软报表,水晶报表初用初分析.
    Winform 打印类重温
    Winform 打印DataGrid View
    Winform 常用的.记住免得以后到处找.
  • 原文地址:https://www.cnblogs.com/buchizaodian/p/13665690.html
Copyright © 2011-2022 走看看