zoukankan      html  css  js  c++  java
  • RabbitMQ(dotnet基本使用)

    前言

         RabbitMQ环境环境搭建及基本配置,在此不讨论。网上一大堆。

         NET环境下,Rabbit库可以在官网或NUGET上查找得到。

    生产者

       

    static void Main(string[] args)
    {
        var factory = new ConnectionFactory();//连接工厂
        factory.HostName = "127.0.0.1";//地址
        factory.UserName = "Test";//登录名
        factory.Password = "t123456!";//密码
     
        using (var connection = factory.CreateConnection())//创建连接
        {
            using (var channel = connection.CreateModel())//创建通道
            {
                //创建队列,第二个参数bool:是否队列持久化
                channel.QueueDeclare(
                   queue: "test", //消息队列名称
                   durable: false,//消息队列是否持久化
                   exclusive: false,//消息队列是否被本次连接connection独享。(本次连接 
                      //connection创建的信道可以共用).排外的queue在当前连接被断开的时候会 
                      //自动消失(清除)无论是否设置了持久化.
                   autoDelete: false,//消息队列是否自动删除。也就是说queue会清理自己,但 
                      是是在最后一个connection断开的时候。
                   arguments: null);//参数对
                
                 var properties = channel.CreateBasicProperties();//可以为null
                 properties.DeliveryMode = 2;//多个消费工作队列时,设置此属性
                 properties.SetPersistent(true);//消息持久化
    
                 string message = "Hello World";
                  var body = Encoding.UTF8.GetBytes(message);
                  //发布消息
                  //第一个参数:交换器,空默认为direct
                  //第二个参数:direct时,为队列名
                  //第三个参数:通道属性,可以是BasicProperties,也可以是属性接口
                  //第四个参数:消息正文
                  channel.BasicPublish("", "hello", properties, body);
                  Console.WriteLine(" set {0}", message);
            }
        }
    }

    消费者

    static void Main(string[] args)
    {
        var factory = new ConnectionFactory();//连接工厂
        factory.HostName = "127.0.0.1";//地址
        factory.UserName = "Test";//登录名
        factory.Password = "t123456!";//密码
     
        using (var connection = factory.CreateConnection())//创建连接
        {
            using (var channel = connection.CreateModel())//创建通道
            {
                //创建队列,第二个参数bool:是否队列持久化
                channel.QueueDeclare(
                   queue: "test", //消息队列名称
                   durable: false,//消息队列是否持久化
                   exclusive: false,//消息队列是否被本次连接connection独享。(本次连接 
                      //connection创建的信道可以共用).排外的queue在当前连接被断开的时候会 
                      //自动消失(清除)无论是否设置了持久化.
                   autoDelete: false,//消息队列是否自动删除。也就是说queue会清理自己,但 
                      是是在最后一个connection断开的时候。
                   arguments: null);//参数对
                
                 //第一种接收方式:循环接收方式
                 var consumer = new QueueingBasicConsumer(channel);//消费者实例
                 channel.BasicConsume(
                     queue: "Test", //队列名称
                     autoAck: false, //是否开启收到消息自动回复
                     consumer: consumer//消费者
                 );
                 //多个消费者确宝公平,设置同一时间一个消费只能接收一个消息;此方法慎用
                 channel.BasicQos(0, 1, false); 
                 while (true)
                {
                    //队列消息对象
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    //设置睡眠时间,可模拟工作队列多个消息者模式,自行决定
                    Thread.Sleep(5* 1000);
                    //假如,前面未设置自动回复,则可以手动;
                    //响应给RabbitMQ服务:收到并处理了消息。        
                    channel.BasicAck(ea.DeliveryTag, false);
                    //遇到无法处理的消息,拒绝且此消息是否放回队列中,发送给其他消费者
                    channel.BasicReject(ea.DeliveryTag, false);
                    Console.WriteLine("Received {0}", message);
                    Console.WriteLine("Done");
                }
                //第二种接收方式:事件方式
                //实例化一个事件型消费者
                var consumer = new EventingBasicConsumer(channel);
                //订阅消费者接收消息的事件
                consumer.Received += (model, ea) =>
               {
                   //获取并解析数据
                  var body = ea.Body;
                  var message = Encoding.UTF8.GetString(body);
                  //响应给RabbitMQ服务:收到并处理了消息。        
                  channel.BasicAck(ea.DeliveryTag, false);
                  //遇到无法处理的消息,拒绝且此消息是否放回队列中,发送给其他消费者
                  channel.BasicReject(ea.DeliveryTag, false);
                  Console.WriteLine($"收到: {message}");
               };
            }
        }
    }
  • 相关阅读:
    给msde加装企业管理器
    InterBase 数据库与驱动 版本不同
    delphi 演示数据路径
    TNetHTTPClient 使用
    MYSQL之库操作
    MYSQL之数据操作
    MYSQL之表操作
    MYSQL之视图、触发器、存储过程、函数、事物、数据库锁和数据库备份
    数据库三范式详解
    MYSQL之索引原理与慢查询优化
  • 原文地址:https://www.cnblogs.com/xtxk110/p/13218554.html
Copyright © 2011-2022 走看看