zoukankan      html  css  js  c++  java
  • RabbitMQ(dotnet基本使用)

    前言

         RabbitMQ环境环境搭建及基本配置,在此不讨论。网上一大堆。

         NET环境下,Rabbit库可以在官网或NUGET上查找得到。

    生产者

       

    static void Main(string[] args)
    {
        var factory = new ConnectionFactory();//连接工厂
        factory.HostName = "127.0.0.1";//地址
        factory.UserName = "Test";//登录名
        factory.Password = "t123456!";//密码
     
        using (var connection = factory.CreateConnection())//创建连接
        {
            using (var channel = connection.CreateModel())//创建通道
            {
                //创建队列,第二个参数bool:是否队列持久化
                channel.QueueDeclare(
                   queue: "test", //消息队列名称
                   durable: false,//消息队列是否持久化
                   exclusive: false,//消息队列是否被本次连接connection独享。(本次连接 
                      //connection创建的信道可以共用).排外的queue在当前连接被断开的时候会 
                      //自动消失(清除)无论是否设置了持久化.
                   autoDelete: false,//消息队列是否自动删除。也就是说queue会清理自己,但 
                      是是在最后一个connection断开的时候。
                   arguments: null);//参数对
                
                 var properties = channel.CreateBasicProperties();//可以为null
                 properties.DeliveryMode = 2;//多个消费工作队列时,设置此属性
                 properties.SetPersistent(true);//消息持久化
    
                 string message = "Hello World";
                  var body = Encoding.UTF8.GetBytes(message);
                  //发布消息
                  //第一个参数:交换器,空默认为direct
                  //第二个参数:direct时,为队列名
                  //第三个参数:通道属性,可以是BasicProperties,也可以是属性接口
                  //第四个参数:消息正文
                  channel.BasicPublish("", "hello", properties, body);
                  Console.WriteLine(" set {0}", message);
            }
        }
    }

    消费者

    static void Main(string[] args)
    {
        var factory = new ConnectionFactory();//连接工厂
        factory.HostName = "127.0.0.1";//地址
        factory.UserName = "Test";//登录名
        factory.Password = "t123456!";//密码
     
        using (var connection = factory.CreateConnection())//创建连接
        {
            using (var channel = connection.CreateModel())//创建通道
            {
                //创建队列,第二个参数bool:是否队列持久化
                channel.QueueDeclare(
                   queue: "test", //消息队列名称
                   durable: false,//消息队列是否持久化
                   exclusive: false,//消息队列是否被本次连接connection独享。(本次连接 
                      //connection创建的信道可以共用).排外的queue在当前连接被断开的时候会 
                      //自动消失(清除)无论是否设置了持久化.
                   autoDelete: false,//消息队列是否自动删除。也就是说queue会清理自己,但 
                      是是在最后一个connection断开的时候。
                   arguments: null);//参数对
                
                 //第一种接收方式:循环接收方式
                 var consumer = new QueueingBasicConsumer(channel);//消费者实例
                 channel.BasicConsume(
                     queue: "Test", //队列名称
                     autoAck: false, //是否开启收到消息自动回复
                     consumer: consumer//消费者
                 );
                 //多个消费者确宝公平,设置同一时间一个消费只能接收一个消息;此方法慎用
                 channel.BasicQos(0, 1, false); 
                 while (true)
                {
                    //队列消息对象
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    //设置睡眠时间,可模拟工作队列多个消息者模式,自行决定
                    Thread.Sleep(5* 1000);
                    //假如,前面未设置自动回复,则可以手动;
                    //响应给RabbitMQ服务:收到并处理了消息。        
                    channel.BasicAck(ea.DeliveryTag, false);
                    //遇到无法处理的消息,拒绝且此消息是否放回队列中,发送给其他消费者
                    channel.BasicReject(ea.DeliveryTag, false);
                    Console.WriteLine("Received {0}", message);
                    Console.WriteLine("Done");
                }
                //第二种接收方式:事件方式
                //实例化一个事件型消费者
                var consumer = new EventingBasicConsumer(channel);
                //订阅消费者接收消息的事件
                consumer.Received += (model, ea) =>
               {
                   //获取并解析数据
                  var body = ea.Body;
                  var message = Encoding.UTF8.GetString(body);
                  //响应给RabbitMQ服务:收到并处理了消息。        
                  channel.BasicAck(ea.DeliveryTag, false);
                  //遇到无法处理的消息,拒绝且此消息是否放回队列中,发送给其他消费者
                  channel.BasicReject(ea.DeliveryTag, false);
                  Console.WriteLine($"收到: {message}");
               };
            }
        }
    }
  • 相关阅读:
    atitit.TokenService v3 qb1 token服务模块的设计 新特性.docx
    Atitit attilax在自然语言处理领域的成果
    Atitit 图像清晰度 模糊度 检测 识别 评价算法 原理
    Atitit (Sketch Filter)素描滤镜的实现  图像处理  attilax总结
    atitit。企业的价值观 员工第一 vs 客户第一.docx
    Atitit 实现java的linq 以及与stream api的比较
    Atitit dsl exer v3 qb3 新特性
    Atititi tesseract使用总结
    Atitit 修改密码的功能流程设计 attilax总结
    atitit.TokenService v3 qb1  token服务模块的设计 新特性.docx
  • 原文地址:https://www.cnblogs.com/xtxk110/p/13218554.html
Copyright © 2011-2022 走看看