zoukankan      html  css  js  c++  java
  • RabbitMQ笔记-Demo(C#)

    创建Connection,推荐使用长连接,长连接基础上创建多个通道

            public void CreateConnection()
            {
                this.ConnectionFactory = new ConnectionFactory
                {
                    HostName = "xx.xx.xx.xx",
                    //Port = 5672,
                    UserName = "admin",
                    Password = "admin",
                    VirtualHost = "my_vhost"
                };
                this.connection = this.ConnectionFactory.CreateConnection();
    
                //方式2
                //ConnectionFactory factory = new ConnectionFactory();
                //factory.Uri = new Uri("amqp://user:pass@hostName:port/vhost");
                //this.connection = factory.CreateConnection();
            }
    

    Direct交换机案例

    private void btnRouteKeyPublish_Click(object sender, EventArgs e)
            {
                string exchangeName = "myexchange1";
                string queueName_logElse = "log_else";
                string queueName_logError = "log_error";
    
                //2:创建channel
                using (var channel = connection.CreateModel())
                {
                    //创建交换机
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);
                    //创建队列
                    channel.QueueDeclare(queueName_logElse, true, false, false, null);
                    channel.QueueDeclare(queueName_logError, true, false, false, null);
                    //绑定
                    channel.QueueBind(queueName_logElse, exchangeName, "info", null);
                    channel.QueueBind(queueName_logElse, exchangeName, "debug", null);
                    channel.QueueBind(queueName_logElse, exchangeName, "warn", null);
                    channel.QueueBind(queueName_logError, exchangeName, "error", null);
    
                    //发布info消息
                    for (int i = 0; i < 10; i++)
                    {
                        var msg = Encoding.UTF8.GetBytes($"{i}:haha-info");
                        channel.BasicPublish(exchangeName, "info", null, msg);
                    }
                    //发布debug消息
                    for (int i = 0; i < 10; i++)
                    {
                        var msg = Encoding.UTF8.GetBytes($"{i}:haha-debug");
                        channel.BasicPublish(exchangeName, "debug", null, msg);
                    }
                    //发布warn消息
                    for (int i = 0; i < 10; i++)
                    {
                        var msg = Encoding.UTF8.GetBytes($"{i}:haha-warn");
                        channel.BasicPublish(exchangeName, "warn", null, msg);
                    }
                    //发布error消息
                    for (int i = 0; i < 10; i++)
                    {
                        var msg = Encoding.UTF8.GetBytes($"{i}:haha-error");
                        channel.BasicPublish(exchangeName, "error", null, msg);
                    }
                }
            }
    

    Topic交换机案例

           private void btnTopicPublish_Click(object sender, EventArgs e)
            {
                string exchangeName = "myTopicExchange1";
                string queueName1 = "topic_queue1";
                string queueName2 = "topic_queue2";
    
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare("myTopicExchange1", ExchangeType.Topic, true, false, null);
                    channel.QueueDeclare(queueName1, true, false, false, null);
                    channel.QueueDeclare(queueName2, true, false, false, null);
                    channel.QueueBind(queueName1, exchangeName, "#.cn", null);
                    channel.QueueBind(queueName2, exchangeName, "*.cn", null);
    
                    //发布info消息(消息会发送到两个队列,因为都匹配)
                    for (int i = 0; i < 100; i++)
                    {
                        var msg = Encoding.UTF8.GetBytes($"{i}:haha");
                        channel.BasicPublish(exchangeName, "fan.cn", null, msg);
                    }
                }
            }
    

    Header交换机案例

            private void btnHeadersPublish_Click(object sender, EventArgs e)
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare("myHeadersExchange1", ExchangeType.Headers, true, false, null);
                    channel.QueueDeclare("headers_queue1", true, false, false, null);
                    channel.QueueBind("headers_queue1", "myHeadersExchange1", string.Empty, new Dictionary<string, object>() {
                            {"x-match","all" },//any
                            { "username","fan"},
                            { "password","123456"}
                        });
                    //properties
                    var properties = channel.CreateBasicProperties();
                    properties.Headers = new Dictionary<string, object>();
                    properties.Headers.Add("username", "fan");
                    properties.Headers.Add("password", "123456");
                    properties.Persistent = true;//消息持久化
                                                 //发布info消息
                    for (int i = 0; i < 100; i++)
                    {
                        var msg = Encoding.UTF8.GetBytes($"{i}:haha");
                        channel.BasicPublish("myHeadersExchange1", string.Empty, properties, msg);
                    }
                }
            }
    

    Demo1.使用默认交换机收发消息(省略了创建交换机,实际使用的是默认交换机,所有创建的队列都隐式绑定默认交换机)

    //发消息
       using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "hello",
                                         durable: true,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
                    channel.BasicPublish(exchange: "",
                                                 routingKey: "hello",
                                                 basicProperties: null,
                                                 body: Encoding.UTF8.GetBytes("Hello World!"));//默认交换机的名字就是""
                }
    //收消息
    using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "hello",
                                         durable: true,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
                    var result = channel.BasicGet("hello", true);
                    string msg = Encoding.UTF8.GetString(result.Body);
                }
    

    Default Exchange:
    Default Exchange 其实是AMQP中预先声明的,属于Direct类型,Default Exchange 的名是 "";
    他有一个特殊的属性,当你手动创建一个队列时,MQ会自动将这个队列绑定到Default Exchange 上,绑定时 RoutingKey 与队列名称相同
    默认交换器隐式绑定到每个队列,路由键等于队列名称。不能显式地绑定到默认交换器,也不能从默认交换器解除绑定。它也不能被删除。

    Demo2.消费消息两种方式

            /// <summary>
            /// 手动拉消息
            /// </summary>
            /// <param name="queueName"></param>
            private void ConsumePullQueue(string queueName)
            {
                var channel = connection.CreateModel();
                var result = channel.BasicGet(queueName, false);//autoACK:false 开启手动确认
                try
                {
                    //处理消息
                    MessageBox.Show(Encoding.UTF8.GetString(result.Body));
                    //手动确认
                    channel.BasicAck(result.DeliveryTag, false);
                }
                catch
                {
                    //退回
                    channel.BasicRecover(true);
                }
                //直接扔了
                //channel.BasicReject(result.DeliveryTag, true);
                //否认确认
                //channel.BasicNack()
            }
            /// <summary>
            /// 自动推消息
            /// 如果多个消费者订阅一个队列,将轮询获取消息
            /// </summary>
            /// <param name="queueNames"></param>
            private void ConsumeEventQueue(string queueName)
            {
                var channel = connection.CreateModel();
                //开启QOS并行限制,每次发送一条,ack后再发送一条
                channel.BasicQos(0, 1, false);
                //通过事件订阅方式消费队列
                EventingBasicConsumer consumer1 = new EventingBasicConsumer(channel);
                consumer1.Received += (sender1, e1) =>
                {
                    MessageBox.Show(Encoding.UTF8.GetString(e1.Body));
                    channel.BasicAck(e1.DeliveryTag, false);//手动确认
                };
                channel.BasicConsume(queueName, false, consumer1);//开始消费.(autoACK:false 手动确认)
            }
    

    Demo3.生产端消息确认、事务执行

    //confirm确认(推荐)
                using (var channel = connection.CreateModel())
                {
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;//消息持久化
                    channel.ConfirmSelect();//将信道设置成confirm模式
                                            //5:发布消息
                    for (int i = 0; i < 100; i++)
                    {
                        var msg = Encoding.UTF8.GetBytes($"{i}:haha");
                        channel.BasicPublish(exchangeName, routingKey, properties, msg);
                    }
                    bool isSuccess = channel.WaitForConfirms();//是否发布成功
                }
    
    //事务执行(低效,不推荐)
                using (var channel = connection.CreateModel())
                {
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;//消息持久化
                    channel.TxSelect();//将信道设置成事务模式
                    try
                    {
                        for (int i = 0; i < 100; i++)
                        {
                            var msg = Encoding.UTF8.GetBytes($"{i}:haha");
                            channel.BasicPublish(exchangeName, routingKey, properties, msg);
                        }
                        channel.TxCommit();
                    }
                    catch
                    {
                        channel.TxRollback();
                    }
                }
    

    参考:
    https://www.rabbitmq.com/getstarted.html

  • 相关阅读:
    CICD : 存代码部署(精简版)
    CICD:通过Shell 将打包后的代码部署到各环境
    linux:curl 取得HTTP返回的状态码
    闭包简单的了解
    javascript正则表达式了解
    搭建PHP开发环境(四)-PHP操作MySQL
    搭建PHP开发环境(三)-MySQL安装配置
    搭建PHP开发环境(二)-PHP安装
    搭建PHP开发环境(一)-Apache安装配置
    生成简单验证码文字
  • 原文地址:https://www.cnblogs.com/fanfan-90/p/13369531.html
Copyright © 2011-2022 走看看