zoukankan      html  css  js  c++  java
  • RabbitMQ的发送和消费示例

    官网:https://www.rabbitmq.com/dotnet-api-guide.html

    C# rabbitMQ示例

    生产者:推送消息

        public class RabbitMqPulish
        {
            /// <summary>
            /// 推送
            /// </summary>
            /// <param name="value"></param>
            public void Producer(int value)
            {
                try
                {
                    var queueName = "test01";
                    var exchangeName = "changeName01";
                    var exchangeType = ExchangeType.Fanout; //fanout、topic、fanout direct
                    var routingKey = "*";
                    var uri = new Uri("amqp://127.0.0.1:5672/");
                    var factory = new ConnectionFactory
                    {
                        UserName = "admin",
                        Password = "admin",
                        RequestedHeartbeat = TimeSpan.FromMilliseconds(0),
                        Endpoint = new AmqpTcpEndpoint(uri)
                    };
    
                    using (var connection = factory.CreateConnection())
                    {
                        using (var channel = connection.CreateModel())
                        {
                            //设置交换器的类型
                            channel.ExchangeDeclare(exchangeName, exchangeType);
                            //声明一个队列,设置队列是否持久化,排他性,与自动删除
                            channel.QueueDeclare(queueName, true, false, false, null);
                            //绑定消息队列,交换器,routingkey
                            channel.QueueBind(queueName, exchangeName, routingKey);
                            //channel.QueueBind("test02", exchangeName, routingKey); //绑定多个队列,多端接收
                            channel.ConfirmSelect();
                            channel.BasicAcks += (sender ,e)=>{
                                Console.WriteLine($"BasicAcks..Pulish..{e.DeliveryTag}");
                            };
                            var properties = channel.CreateBasicProperties();
                            //队列持久化
                            properties.Persistent = true;
                            properties.DeliveryMode = 2; //发送具有传递模式 2(持久性)
                            //properties.Expiration = "36000000";
                            for (int i = 0; i < value; i++)
                            {
                                var pubMessage = new MqMessage()
                                {
                                    MsgId = Guid.NewGuid().ToString(),
                                    PushTime = DateTime.Now,
                                    SendSortNo = i,
                                    Message = "测试发送消息 " + i + ""
                                };
                                //JsonConvert.DeserializeObject
                                var body = Encoding.UTF8.GetBytes(System.Text.Json.JsonSerializer.Serialize(pubMessage));
                                //发送信息
                                channel.BasicPublish(exchangeName, routingKey, properties, body);
    
                                Console.WriteLine($"sending...{i}..." + System.Text.Json.JsonSerializer.Serialize(pubMessage));
                                System.Threading.Thread.Sleep(1000);
                            }
    
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
    
            /// <summary>
            /// 推送Demo
            /// </summary>
            /// <param name="value"></param>
            public void PublishDemo(int value = 100)
            {
                #region 使用端点列表
    
                //var endpoints = new System.Collections.Generic.List<AmqpTcpEndpoint> {
                //  new AmqpTcpEndpoint("hostname"),
                //  new AmqpTcpEndpoint("localhost")
                //};
                //IConnection conn = factory.CreateConnection(endpoints);
    
                #endregion
    
                var queueName = "test02";
                var exchangeName = "changeName02";
                //var exchangeType = ExchangeType.Fanout;// fanout、topic、fanout direct
                var routingKey = "*";
    
                ConnectionFactory factory = new ConnectionFactory();
                factory.VirtualHost = "/";
                factory.UserName = "admin";
                factory.Password = "admin";
                factory.Uri = new Uri("amqp://127.0.0.1:5672/");
                //心跳超时值:RabbitMQ 建议的默认值为60。 零值表示对等​​体建议完全禁用心跳
                //将超时设置为 60 秒
                factory.RequestedHeartbeat = TimeSpan.FromSeconds(60);
                //连接工厂属性ConnectionFactory.ClientProvidedName,如果设置了该属性, 则控制该工厂打开的所有新连接的客户端提供的连接名称。
                factory.ClientProvidedName = "app:audit component:event-consumer";
                IConnection conn = factory.CreateConnection();
                //像连接一样,通道应该是长期存在的。为每个操作打开一个新通道会非常低效,并且非常不鼓励。通道的寿命可能比连接短.
                //每次操作关闭和打开新通道通常是不必要的,但可能是适当的
                IModel channel = conn.CreateModel();
    
                channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
                ////x-max-priority属性必须设置,否则消息优先级不生效
                //IDictionary<string, object> dicQueue = new Dictionary<string, object>();
                //dicQueue.Add("x-max-priority", 50);
                //dicQueue.Add("x-message-ttl", 60000); //队列超时
                ///// new Dictionary<string, object> { { "x-max-priority", 50 }
                //channel.QueueDeclare(queueName, true, false, false, dicQueue);
    
                channel.QueueDeclare(queueName, true, false, false, null);
                ////声明一个队列并指示服务器不发送任何响应
                //channel.QueueDeclareNoWait(queueName, true, false, false, null);
                channel.QueueBind(queueName, exchangeName, routingKey, null);
                ////推送的确认
                //channel.ConfirmSelect();
                //channel.BasicAcks += (sender, e) => {
                //    Console.WriteLine($"BasicAcks..Pulish..{e.DeliveryTag}");
                //};
                ////客户端可以订阅 IModel.BasicReturn事件。如果没有附加到事件的侦听器,则返回的消息将被静默丢弃。
                //channel.BasicReturn += (sender, e) => {
                //    Console.WriteLine($"Pulish..处理不可路由的消息..{e.Exchange}");
                //}; 
    
                //IModel#QueueDeclarePassive和IModel#ExchangeDeclarePassive是用于被动声明的方法
                var response = channel.QueueDeclarePassive(queueName);
                // 返回队列中处于就绪状态的消息数
                var messageCount = response.MessageCount;
                // 返回队列拥有的消费者数量
                var consumerCount = response.ConsumerCount;
                Console.WriteLine($"队列:{queueName},就绪状态的消息数:{messageCount},消费者数量:{consumerCount}");
    
                var props = channel.CreateBasicProperties();
                //队列持久化
                props.Persistent = true;
                //props.ContentType = "text/plain";
                props.DeliveryMode = 2;   //发送具有传递模式 2(持久性)
                //props.Expiration = "36000000";   //每条消息超时
                props.Priority = (byte)10;//设置消息优先级
    
                for (int i = 0; i < value; i++)
                {
                    //Publishing Messages 方式1:
                    //byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
                    //channel.BasicPublish(exchangeName, routingKey, null, messageBodyBytes);
                    //精细控制,您可以使用重载变体来指定强制标志,或指定消息属性
                    var strMsg = $"Hello, world! ....{i}....{DateTime.Now}";
                    byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(strMsg);
                    channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes);
                    Console.WriteLine($"sending...{i}...{strMsg}" );
                    System.Threading.Thread.Sleep(1000);
                }
    
                //要断开连接,只需关闭通道和连接:
                channel.Close();
                conn.Close();
    
                #region 一些设置
    
                //type:可选项为,fanout,direct,topic,headers。区别如下:
                //fanout:发送到所有与当前Exchange绑定的Queue中
                //direct:发送到与消息的routeKey相同的Rueue中
                //topic:fanout的模糊版本
                //headers:发送到与消息的header属性相同的Queue中
    
                //durable:持久化
                //autoDelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。
                //exclusive:如果为true,则queue只在channel存在时存在,channel关闭则queue消失
    
                ////显式删除队列或交换:
                //channel.QueueDelete("queue-name", false, false);
                ////只有当队列为空时才可以删除队列
                //channel.QueueDelete("queue-name", false, true);
                ////或者如果它没有被使用(没有任何消费者)
                //channel.QueueDelete("queue-name", true, false);
                ////可以清除队列(删除其所有消息):
                //channel.QueuePurge("队列名称");
    
                ////带有自定义标头的消息
                //byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
    
                //IBasicProperties props = channel.CreateBasicProperties();
                //props.ContentType = "text/plain";
                //props.DeliveryMode = 2;
                //发布带有自定义标头的消息:Header模式
                //props.Headers = new Dictionary<string, object>();
                //props.Headers.Add("纬度", 51.5252949);
                //props.Headers.Add("longitude", -0.0905493);
    
                //channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes);
                //设置过期时间,两种方式来设置,1.队列 2.消息
                ////设置消息过期:
                //byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
    
                //IBasicProperties props = channel.CreateBasicProperties();
                //props.ContentType = "text/plain";
                //props.DeliveryMode = 2;
                //props.Expiration = " 36000000 "; //每条消息超时
    
                //channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes);
    
                ////启用自动连接恢复
                //factory.AutomaticRecoveryEnabled = true;
                //// 每 10 秒尝试恢复一次,默认为5秒
                //factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
                //https://www.cnblogs.com/chenyishi/p/10242162.html
                ////设置优先级:x-max-priority属性必须设置,否则消息优先级不生效
                //IDictionary<string, object> dicQueue = new Dictionary<string, object>();
                //dicQueue.Add("x-max-priority", 50);
                //dicQueue.Add("x-expires",10000); //设置当前队列的过期时间为10000毫秒
                //channel.QueueDeclare(queueName, true, false, false, dicQueue);
                //properties.Priority = (byte)10;//设置消息优先级
    
                //死信的产生有三种
                //1.消息被拒(basic.reject or basic.nack)并且没有重新入队(requeue = false);
                //2.当前队列中的消息数量已经超过最大长度。
                //3.消息在队列中过期,即当前消息在队列中的存活时间已经超过了预先设置的TTL(Time To Live)时间;
                //arguments = new Dictionary<string, object> {
                //{ "x-dead-letter-exchange",exchangeD}, //设置当前队列的DLX
                //{ "x-dead-letter-routing-key",routeD}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
                //{ "x-message-ttl",10000} //设置消息的存活时间,即过期时间
                //};
    
    
                #endregion
            }
    
            /// <summary>
            /// 设置优先级
            /// </summary>
            /// <param name="value"></param>
            public void ProducerPriority(int value = 100)
            {
                try
                {
                    var queueName = "test03";
                    var exchangeName = "changeName03";
                    //var exchangeType = ExchangeType.Fanout;// fanout、topic、fanout direct
                    var routingKey = "*";
    
                    ConnectionFactory factory = new ConnectionFactory();
                    factory.VirtualHost = "/";
                    factory.UserName = "admin";
                    factory.Password = "admin";
                    factory.Uri = new Uri("amqp://127.0.0.1:5672/");
                    //心跳超时值:RabbitMQ 建议的默认值为60。 零值表示对等​​体建议完全禁用心跳
                    //将超时设置为 60 秒
                    factory.RequestedHeartbeat = TimeSpan.FromSeconds(0);
                    //连接工厂属性ConnectionFactory.ClientProvidedName,如果设置了该属性, 则控制该工厂打开的所有新连接的客户端提供的连接名称。
                    factory.ClientProvidedName = "app:audit component:event-consumer";
                    IConnection conn = factory.CreateConnection();
                    //像连接一样,通道应该是长期存在的。为每个操作打开一个新通道会非常低效,并且非常不鼓励。通道的寿命可能比连接短.
                    //每次操作关闭和打开新通道通常是不必要的,但可能是适当的
                    IModel channel = conn.CreateModel();
    
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
                    //x-max-priority属性必须设置,否则消息优先级不生效
                    IDictionary<string, object> dicQueue = new Dictionary<string, object>();
                    dicQueue.Add("x-max-priority", 10); //推送和消费一致
                    //dicQueue.Add("x-message-ttl", 60000); //队列超时
                    ///new Dictionary<string, object> { { "x-max-priority", 50 }
                    channel.QueueDeclare(queueName, true, false, false, dicQueue);
                    ////声明一个队列并指示服务器不发送任何响应
                    //channel.QueueDeclareNoWait(queueName, true, false, false, null);
                    channel.QueueBind(queueName, exchangeName, routingKey, null);
                    
                    for (int i = 0; i < value; i++)
                    {
                        var props = channel.CreateBasicProperties();
                        //队列持久化
                        props.Persistent = true;
                        //props.ContentType = "text/plain";
                        props.DeliveryMode = 2;
                        //props.Expiration = "36000000";   //每条消息超时
                        props.Priority = (byte)(i % 10);//设置消息优先级
    
                        //Publishing Messages 方式1:
                        //byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
                        //channel.BasicPublish(exchangeName, routingKey, null, messageBodyBytes);
                        //精细控制,您可以使用重载变体来指定强制标志,或指定消息属性
                        var strMsg = $"Hello, world! ....{i}....{DateTime.Now}";
                        byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(strMsg);
                        
                        channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes);
                        Console.WriteLine($"sending...{i}...{strMsg}");
                        //System.Threading.Thread.Sleep(1000);
                    }
    
                    //要断开连接,只需关闭通道和连接:
                    channel.Close();
                    conn.Close();
    
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
    
            /// <summary>
            /// 死信队列
            /// </summary>
            /// <param name="value"></param>
            public void ProducerDeadLetter(int value = 100)
            {
                try
                {
                    var queueNameDead = "queueDead";
                    var exchangeNameDead = "changeNameDead";
                    var routingKeyDead = "routingKeyDead";
    
                    var queueName = "test04";
                    var exchangeName = "changeName04";
                    //var exchangeType = ExchangeType.Fanout;// fanout、topic、fanout direct
                    var routingKey = "*";
    
                    ConnectionFactory factory = new ConnectionFactory();
                    factory.VirtualHost = "/";
                    factory.UserName = "admin";
                    factory.Password = "admin";
                    factory.Uri = new Uri("amqp://127.0.0.1:5672/");
                    //心跳超时值:RabbitMQ 建议的默认值为60。 零值表示对等​​体建议完全禁用心跳
                    //将超时设置为 60 秒
                    factory.RequestedHeartbeat = TimeSpan.FromSeconds(0);
                    //连接工厂属性ConnectionFactory.ClientProvidedName,如果设置了该属性, 则控制该工厂打开的所有新连接的客户端提供的连接名称。
                    factory.ClientProvidedName = "app:audit component:event-consumer";
                    IConnection conn = factory.CreateConnection();
                    //像连接一样,通道应该是长期存在的。为每个操作打开一个新通道会非常低效,并且非常不鼓励。通道的寿命可能比连接短.
                    //每次操作关闭和打开新通道通常是不必要的,但可能是适当的
                    IModel channel = conn.CreateModel();
                    //绑定死信队列
                    channel.ExchangeDeclare(exchangeNameDead, ExchangeType.Topic);
                    channel.QueueDeclare(queueNameDead, true, false, false, null);
                    channel.QueueBind(queueNameDead, exchangeNameDead, routingKeyDead, null);
                    //死信的产生有三种
                    //1.消息被拒(basic.reject or basic.nack)并且没有重新入队(requeue = false);
                    //2.当前队列中的消息数量已经超过最大长度。
                    //3.消息在队列中过期,即当前消息在队列中的存活时间已经超过了预先设置的TTL(Time To Live)时间;
                    //正常队列
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
                    //x-max-priority属性必须设置,否则消息优先级不生效
                    IDictionary<string, object> dicQueue = new Dictionary<string, object>();
                    dicQueue.Add("x-dead-letter-exchange", exchangeNameDead); //设置当前队列的DLX
                    dicQueue.Add("x-dead-letter-routing-key", routingKeyDead); //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
                    dicQueue.Add("x-message-ttl", 10000); //队列超时
                    ///new Dictionary<string, object> { { "x-max-priority", 50 }
                    channel.QueueDeclare(queueName, true, false, false, dicQueue);
                    ////声明一个队列并指示服务器不发送任何响应
                    //channel.QueueDeclareNoWait(queueName, true, false, false, null);
                    channel.QueueBind(queueName, exchangeName, routingKey, null);
    
                    for (int i = 0; i < value; i++)
                    {
                        var props = channel.CreateBasicProperties();
                        //队列持久化
                        props.Persistent = true;
                        //props.ContentType = "text/plain";
                        props.DeliveryMode = 2; //发送具有传递模式 2(持久性)
                        //props.Expiration = "36000000";   //每条消息超时
                        //props.Priority = (byte)(i % 10);//设置消息优先级
    
                        //Publishing Messages 方式1:
                        //byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
                        //channel.BasicPublish(exchangeName, routingKey, null, messageBodyBytes);
                        //精细控制,您可以使用重载变体来指定强制标志,或指定消息属性
                        var strMsg = $"Hello, world! ....{i}....{DateTime.Now}";
                        byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(strMsg);
    
                        channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes);
                        Console.WriteLine($"sending...{i}...{strMsg}");
                        //System.Threading.Thread.Sleep(1000);
                    }
    
                    //要断开连接,只需关闭通道和连接:
                    channel.Close();
                    conn.Close();
    
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
    
            private void Channel_BasicReturn(object sender, RabbitMQ.Client.Events.BasicReturnEventArgs e)
            {
                throw new NotImplementedException();
            }
    
            private void Channel_BasicAcks(object sender, RabbitMQ.Client.Events.BasicAckEventArgs e)
            {
                throw new NotImplementedException();
            }
        }
    
        /// <summary>
        /// 消息对象
        /// </summary>
        public class MqMessage
        {
            public string MsgId { get; set; }
    
            public string Type { get; set; }
            public int SendSortNo { get; set; }
    
            public string Message { get; set; }
    
            public object Data { get; set; }
    
            public DateTime PushTime { get; set; }
        }
    Pulish

    消费者:接收消息

            /// <summary>
            /// 消费Demo
            /// </summary>
            public void ConsumerDemo()
            {
                #region 使用端点列表
    
                //var endpoints = new System.Collections.Generic.List<AmqpTcpEndpoint> {
                //  new AmqpTcpEndpoint("hostname"),
                //  new AmqpTcpEndpoint("localhost")
                //};
                //IConnection conn = factory.CreateConnection(endpoints);
    
                #endregion
    
                var queueName = "test02";
                var exchangeName = "changeName02";
                //var exchangeType = ExchangeType.Fanout;// fanout、topic、fanout direct
                var routingKey = "*";
    
                ConnectionFactory factory = new ConnectionFactory();
                factory.VirtualHost = "/";
                factory.UserName = "admin";
                factory.Password = "admin";
                factory.Uri = new Uri("amqp://127.0.0.1:5672/");
                //心跳超时值:RabbitMQ 建议的默认值为60。 零值表示对等​​体建议完全禁用心跳
                // 将超时设置为 60 秒
                factory.RequestedHeartbeat = TimeSpan.FromSeconds(0);
                //连接工厂属性ConnectionFactory.ClientProvidedName,如果设置了该属性, 则控制该工厂打开的所有新连接的客户端提供的连接名称。
                factory.ClientProvidedName = "app:audit component:event-consumer";
                ////面向异步的消费者调度实现。此调度程序只能用于异步消费者,即IAsyncBasicConsumer实现
                //factory.DispatchConsumersAsync = true;
                IConnection conn = factory.CreateConnection();
                //像连接一样,通道应该是长期存在的。为每个操作打开一个新通道会非常低效,并且非常不鼓励。通道的寿命可能比连接短.
                //每次操作关闭和打开新通道通常是不必要的,但可能是适当的
                IModel channel = conn.CreateModel();
    
                channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
                channel.QueueDeclare(queueName, true, false, false, null); //  优先级要设置"x-max-priority"
                ////声明一个队列并指示服务器不发送任何响应
                //channel.QueueDeclareNoWait(queueName, true, false, false, null);
                channel.QueueBind(queueName, exchangeName, routingKey, null);
                ////输入1 如果接收一个消息,但是没有应答,客户端不会接收下一个消息
                //prefetch_count false对于基于当前Channel创建的消费者生效
                //prefetch_count true值在当前Channel的所有消费者共享
                channel.BasicQos(0, 1, false);  //信道预取设置 (QoS), 0:对这个消费者没有限制
                Console.WriteLine("Listenging....");
    
                //IModel#QueueDeclarePassive和IModel#ExchangeDeclarePassive是用于被动声明的方法
                var response = channel.QueueDeclarePassive(queueName);
                // 返回队列中处于就绪状态的消息数
                var messageCount =  response.MessageCount;
                // 返回队列拥有的消费者数量
                var consumerCount = response.ConsumerCount;
                Console.WriteLine($"队列:{queueName},就绪状态的消息数:{messageCount},消费者数量:{consumerCount}");
                //Retrieving Messages By Subscription("push API")
                //接收消息的推荐和最方便的方法是使用IBasicConsumer接口设置订阅 。然后,消息将在到达时自动传送,而不必主动请求。
                //实现消费者的一种方法是使用便利类EventingBasicConsumer,它将交付和其他消费者生命周期事件作为 C# 事件分派
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (ch, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($"EventingBasicConsumering....{message}....");
                    // 肯定确认单次投递,消息将被丢弃
                    //false :positively acknowledge a single delivery, the message will be discarded
                    //积极确认所有交付这个交货标签
                    //true :positively acknowledge all deliveries up to this delivery tag
                    channel.BasicAck(ea.DeliveryTag, false); //自动应答不需要 ,DeliveryTag:当前消息被处理的次序数
                    Console.WriteLine($"BasicAcking....end...次序:{ea.DeliveryTag},Task:{System.Threading.Thread.CurrentThread.ManagedThreadId}....{DateTime.Now}");
                    //if (true)
                    //{
                    //    //让失败的消息回到队列中,消息被处理之后又放到了队尾
                    //    channel.BasicReject(ea.DeliveryTag, true);
                    //}
                };
                // this consumer tag identifies the subscription, when it has to be cancelled
                //false为手动应答,true为自动应答
                string consumerTag = channel.BasicConsume(queueName, false, consumer);
                ////basic.consume方法中的x-priority参数 设置为整数值。没有指定值的消费者优先级为0。数字越大表示优先级越高,正数和负数都可以使用
                //Dictionary<string, object> dicParam = new Dictionary<string, object>();
                //dicParam["x-priority"] = 10;
                //string consumerTag2 = channel.BasicConsume(queueName, false, "", dicParam, consumer);
                //Console.WriteLine($"BasicConsume....{consumerTag2}");
    
                Console.WriteLine($"BasicConsume....{consumerTag}");
    
                //取消活动使用者
                //channel.BasicCancel(consumerTag);
                ////要断开连接,只需关闭通道和连接:
                //channel.Close();
                //conn.Close();
    
                #region MyRegion
    
                ////https://www.rabbitmq.com/dotnet-api-guide.html
    
                ////显式删除队列或交换:
                //channel.QueueDelete("queue-name", false, false);
                ////只有当队列为空时才可以删除队列
                //channel.QueueDelete("queue-name", false, true);
                ////或者如果它没有被使用(没有任何消费者)
                //channel.QueueDelete("queue-name", true, false);
                ////可以清除队列(删除其所有消息):
                //channel.QueuePurge("队列名称");
    
                ////带有自定义标头的消息
                //byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
    
                //IBasicProperties props = channel.CreateBasicProperties();
                //props.ContentType = "text/plain";
                //props.DeliveryMode = 2;
                //props.Headers = new Dictionary<string, object>();
                //props.Headers.Add("纬度", 51.5252949);
                //props.Headers.Add("longitude", -0.0905493);
    
                //channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes);
                //设置过期时间,两种方式来设置,1.队列 2.消息
                ////设置消息过期:
                //byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
    
                //IBasicProperties props = channel.CreateBasicProperties();
                //props.ContentType = "text/plain";
                //props.DeliveryMode = 2;
                //props.Expiration = " 36000000 ";
    
                //channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes);
    
                //取消活动使用者
                //channel.BasicCancel(consumerTag);
    
                ////面向异步的消费者调度实现。此调度程序只能用于异步消费者,即IAsyncBasicConsumer实现
                //factory.DispatchConsumersAsync = true; 
    
                //var consumer = new AsyncEventingBasicConsumer(channel);
                //consumer.Received += async (ch, ea) =>
                //{
                //    var body = ea.Body.ToArray();
                //    // copy or deserialise the payload
                //    // and process the message
                //    // ...
    
                //    channel.BasicAck(ea.DeliveryTag, false);
                //    await Task.Yield();
    
                //};
                //// this consumer tag identifies the subscription
                //// when it has to be cancelled
                //string consumerTag = channel.BasicConsume(queueName, false, consumer);
                //// ensure we get a delivery
                //bool waitRes = latch.WaitOne(2000);
    
                ////“拉”一条消息,请使用IModel.BasicGet方法
                //bool autoAck = false; //手动确认(autoAck = false)
                //BasicGetResult result = channel.BasicGet(queueName, autoAck);
                //if (result == null)
                //{
                //    // No message available at this time.
                //}
                //else
                //{
                //    IBasicProperties props = result.BasicProperties;
                //    ReadOnlyMemory<byte> body = result.Body;
                //}
    
                //// 确认收到消息
                //channel.BasicAck(result.DeliveryTag, false); //使用此 API 获取消息的效率相对较低
    
                //basic.consume方法中的x-priority参数 设置为整数值。没有指定值的消费者优先级为0。数字越大表示优先级越高,正数和负数都可以使用
    
    
                #endregion
    
            }
    
            /// <summary>
            /// 自动应答
            /// </summary>
            public void ConsumerAutomatic()
            {
                try
                {
                    var queueName = "test01";
                    var exchangeName = "changeName01";
                    var exchangeType = ExchangeType.Fanout;// fanout、topic、fanout direct
                    var routingKey = "*";
                    var uri = new Uri("amqp://127.0.0.1:5672/");
                    var factory = new ConnectionFactory
                    {
                        UserName = "admin",
                        Password = "admin",
                        RequestedHeartbeat = TimeSpan.FromMilliseconds(30),
                        Endpoint = new AmqpTcpEndpoint(uri)
                    };
    
                    var connection = factory.CreateConnection();
                    var channel = connection.CreateModel();
    
                    //设置交换器的类型
                    channel.ExchangeDeclare(exchangeName, exchangeType);
                    //声明一个队列,设置队列是否持久化,排他性,与自动删除,名称相同不会重复创建
                    channel.QueueDeclare(queueName, true, false, false, null);
                    //绑定消息队列,交换器,routingkey
                    channel.QueueBind(queueName, exchangeName, routingKey);
                    ////输入1 如果接收一个消息,但是没有应答,客户端不会接收下一个消息
                    //prefetch_count false对于基于当前Channel创建的消费者生效
                    //prefetch_count true值在当前Channel的所有消费者共享
                    channel.BasicQos(0, 1, false);  //信道预取设置 (QoS)
    
                    Console.WriteLine("Listenging....");
    
                    //定义这个队列的消费者
                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);    //QueueingBasicConsumer 已过时
                    consumer.Received += (ch, ea) =>
                    {
                        try
                        {
                            var body = ea.Body.ToArray();
                            // copy or deserialise the payload
                            // and process the message
                            var message = Encoding.UTF8.GetString(body);
    
                            var conMessage = System.Text.Json.JsonSerializer.Deserialize<MqMessage>(message);
    
                            Console.WriteLine($"EventingBasicConsumering....{conMessage.SendSortNo}....{conMessage.PushTime}");
                            // 肯定确认单次投递,消息将被丢弃
                            //false :positively acknowledge a single delivery, the message will be discarded
                            //积极确认所有交付这个交货标签
                            //true :positively acknowledge all deliveries up to this delivery tag
                            //channel.BasicAck(ea.DeliveryTag, false); //自动应答不需要
                            Console.WriteLine($"BasicAcking....{conMessage.SendSortNo }");
                            //if (conMessage.SendSortNo % 2 == 1)
                            //{
                            //    channel.BasicAck(ea.DeliveryTag, false);
                            //    Console.WriteLine($"BasicAcking....{conMessage.SendSortNo }");
                            //}
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine($"异常BasicAcking....{ex.Message }");
                            //重新排队所有未确认的交付,一次拒绝或重新排队多条消息
                            channel.BasicNack(ea.DeliveryTag, true, true);
                            //requeue重新回到队列
                            channel.BasicNack(ea.DeliveryTag, false, false);
                            //requeue重新排队交货:true,false丢弃
                            channel.BasicReject(ea.DeliveryTag, true);
    
                        }
    
                        //Console.WriteLine($"EventingBasicConsumering....end");
                        //System.Threading.Thread.Sleep(1000);
                    };
                    // this consumer tag identifies the subscription
                    // when it has to be cancelled
                    //false为手动应答,true为自动应答
                    string consumerTag = channel.BasicConsume(queueName, true, consumer);
    
                    Console.WriteLine($"BasicConsume....{consumerTag}");
    
                    System.Threading.Thread.Sleep(1000);
    
                    ////while (true)
                    ////{
                    ////    //阻塞函数 获取队列中的消息
                    ////    BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    ////    byte[] bytes = ea.Body;
                    ////    var messageStr = Encoding.UTF8.GetString(bytes);
                    ////    var message = DoJson.JsonToModel<QMessage>(messageStr);
                    ////    Console.WriteLine("Receive a Message, DateTime:" + message.DateTime.ToString("yyyy-MM-dd HH:mm:ss") + " Title:" + message.Title);
                    ////    //如果是自动应答,下下面这句代码不用写啦。
                    ////    if ((Convert.ToInt32(message.Title) % 2) == 1)
                    ////    {
                    ////        channel.BasicAck(ea.DeliveryTag, false);
                    ////    }
                    ////    System.Threading.Thread.Sleep(1000);
                    ////}
    
    
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
    
            /// <summary>
            /// 手动应答模式
            /// </summary>
            public void ConsumerManual()
            {
                try
                {
                    var queueName = "test01";
                    var exchangeName = "changeName01";
                    var exchangeType = ExchangeType.Fanout;// fanout、topic、fanout direct
                    var routingKey = "*";
                    var uri = new Uri("amqp://127.0.0.1:5672/");
                    var factory = new ConnectionFactory
                    {
                        UserName = "admin",
                        Password = "admin",
                        RequestedHeartbeat = TimeSpan.FromMilliseconds(0),
                        Endpoint = new AmqpTcpEndpoint(uri)
                    };
    
                    var connection = factory.CreateConnection();
                    var channel = connection.CreateModel();
    
                    //设置交换器的类型
                    channel.ExchangeDeclare(exchangeName, exchangeType);
                    //声明一个队列,设置队列是否持久化,排他性,与自动删除,名称相同不会重复创建
                    channel.QueueDeclare(queueName, true, false, false, null);
                    //绑定消息队列,交换器,routingkey
                    channel.QueueBind(queueName, exchangeName, routingKey);
                    ////输入1 如果接收一个消息,但是没有应答,客户端不会接收下一个消息
                    //prefetch_count false对于基于当前Channel创建的消费者生效
                    //prefetch_count true值在当前Channel的所有消费者共享
                    channel.BasicQos(0, 1, true);  //
    
                    Console.WriteLine("Listenging....");
    
                    //定义这个队列的消费者
                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);    //QueueingBasicConsumer 已过时
                    consumer.Received += (ch, ea) =>
                    {
                        try
                        {
                            var body = ea.Body.ToArray();
                            // copy or deserialise the payload
                            // and process the message
                            var message = Encoding.UTF8.GetString(body);
    
                            var conMessage = System.Text.Json.JsonSerializer.Deserialize<MqMessage>(message);
    
                            Console.WriteLine($"EventingBasicConsumering....{conMessage.SendSortNo}....{conMessage.PushTime}");
                            // 肯定确认单次投递,消息将被丢弃
                            //false :positively acknowledge a single delivery, the message will be discarded
                            //积极确认所有交付这个交货标签
                            //true :positively acknowledge all deliveries up to this delivery tag
                            channel.BasicAck(ea.DeliveryTag, false);
                            Console.WriteLine($"BasicAcking....{conMessage.SendSortNo }");
                            //if (conMessage.SendSortNo % 2 == 1)
                            //{
                            //    channel.BasicAck(ea.DeliveryTag, false);
                            //    Console.WriteLine($"BasicAcking....{conMessage.SendSortNo }");
                            //}
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine($"异常BasicAcking....{ex.Message }");
                            //重新排队所有未确认的交付
                            channel.BasicNack(ea.DeliveryTag, true, true);
                            //重新回到队列
                            channel.BasicNack(ea.DeliveryTag, false, false);
                            //重新排队交货:true,false丢弃
                            channel.BasicReject(ea.DeliveryTag, true);
                        }
    
                        //Console.WriteLine($"EventingBasicConsumering....end");
                        //System.Threading.Thread.Sleep(1000);
                    };
                    // this consumer tag identifies the subscription
                    // when it has to be cancelled
                    //false为手动应答,true为自动应答
                    string consumerTag = channel.BasicConsume(queueName, false, consumer);
    
                    Console.WriteLine($"BasicConsume....{consumerTag}");
    
                    System.Threading.Thread.Sleep(1000);
    
                    ////while (true)
                    ////{
                    ////    //阻塞函数 获取队列中的消息
                    ////    BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    ////    byte[] bytes = ea.Body;
                    ////    var messageStr = Encoding.UTF8.GetString(bytes);
                    ////    var message = DoJson.JsonToModel<QMessage>(messageStr);
                    ////    Console.WriteLine("Receive a Message, DateTime:" + message.DateTime.ToString("yyyy-MM-dd HH:mm:ss") + " Title:" + message.Title);
                    ////    //如果是自动应答,下下面这句代码不用写啦。
                    ////    if ((Convert.ToInt32(message.Title) % 2) == 1)
                    ////    {
                    ////        channel.BasicAck(ea.DeliveryTag, false);
                    ////    }
                    ////    System.Threading.Thread.Sleep(1000);
                    ////}
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
    
            /// <summary>
            /// 设置优先级
            /// </summary>
            public void ConsumerPriority()
            {
                var queueName = "test03";
                var exchangeName = "changeName03";
                //var exchangeType = ExchangeType.Fanout;// fanout、topic、fanout direct
                var routingKey = "*";
    
                ConnectionFactory factory = new ConnectionFactory();
                factory.VirtualHost = "/";
                factory.UserName = "admin";
                factory.Password = "admin";
                factory.Uri = new Uri("amqp://127.0.0.1:5672/");
                //心跳超时值:RabbitMQ 建议的默认值为60。 零值表示对等​​体建议完全禁用心跳
                // 将超时设置为 60 秒
                factory.RequestedHeartbeat = TimeSpan.FromSeconds(0);
                //连接工厂属性ConnectionFactory.ClientProvidedName,如果设置了该属性, 则控制该工厂打开的所有新连接的客户端提供的连接名称。
                factory.ClientProvidedName = "app:audit component:event-consumer";
                //面向异步的消费者调度实现。此调度程序只能用于异步消费者,即IAsyncBasicConsumer实现
                factory.DispatchConsumersAsync = true;
                IConnection conn = factory.CreateConnection();
                //像连接一样,通道应该是长期存在的。为每个操作打开一个新通道会非常低效,并且非常不鼓励。通道的寿命可能比连接短.
                //每次操作关闭和打开新通道通常是不必要的,但可能是适当的
                IModel channel = conn.CreateModel();
    
                channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
                Dictionary<string, object> dicParam = new Dictionary<string, object>();
                dicParam["x-max-priority"] = 10;
                channel.QueueDeclare(queueName, true, false, false, dicParam); //  优先级要设置"x-max-priority"
                ////声明一个队列并指示服务器不发送任何响应
                //channel.QueueDeclareNoWait(queueName, true, false, false, null);
                channel.QueueBind(queueName, exchangeName, routingKey, null);
                ////输入1 如果接收一个消息,但是没有应答,客户端不会接收下一个消息
                //prefetch_count false对于基于当前Channel创建的消费者生效
                //prefetch_count true值在当前Channel的所有消费者共享
                channel.BasicQos(0, 10, false);  //信道预取设置 (QoS), 0:对这个消费者没有限制
                Console.WriteLine("Listenging....");
    
                //IModel#QueueDeclarePassive和IModel#ExchangeDeclarePassive是用于被动声明的方法
                var response = channel.QueueDeclarePassive(queueName);
                // 返回队列中处于就绪状态的消息数
                var messageCount = response.MessageCount;
                // 返回队列拥有的消费者数量
                var consumerCount = response.ConsumerCount;
                Console.WriteLine($"队列:{queueName},就绪状态的消息数:{messageCount},消费者数量:{consumerCount}");
                //Retrieving Messages By Subscription("push API")
                //接收消息的推荐和最方便的方法是使用IBasicConsumer接口设置订阅 。然后,消息将在到达时自动传送,而不必主动请求。
                //实现消费者的一种方法是使用便利类EventingBasicConsumer,它将交付和其他消费者生命周期事件作为 C# 事件分派
                var consumer = new AsyncEventingBasicConsumer(channel);
                consumer.Received += async (ch, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($"EventingBasicConsumering....{message}....");
                    // 肯定确认单次投递,消息将被丢弃
                    //false :positively acknowledge a single delivery, the message will be discarded
                    //积极确认所有交付这个交货标签
                    //true :positively acknowledge all deliveries up to this delivery tag
                    channel.BasicAck(ea.DeliveryTag, false); //自动应答不需要
                    Console.WriteLine($"BasicAcking....end...{DateTime.Now}");
                    await Task.Delay(2000);
                    await Task.Yield();
                };
                // this consumer tag identifies the subscription
                // when it has to be cancelled
                string consumerTag = channel.BasicConsume(queueName, false, consumer);
                // ensure we get a delivery
                
    
                ////basic.consume方法中的x-priority参数 设置为整数值。没有指定值的消费者优先级为0。数字越大表示优先级越高,正数和负数都可以使用
                //Dictionary<string, object> dicParam = new Dictionary<string, object>();
                //dicParam["x-priority"] = 10;
                //string consumerTag2 = channel.BasicConsume(queueName, false, "", dicParam, consumer);
                //Console.WriteLine($"BasicConsume....{consumerTag2}");
    
                Console.WriteLine($"BasicConsume....{consumerTag}");
            }
    
            /// <summary>
            /// 创建多个消费者
            /// </summary>
            /// <param name="consumerCnt"></param>
            public void BatchConsumer(int consumerCnt)
            {
                try
                {
                    var queueName = "test01";
                    var exchangeName = "changeName01";
                    var exchangeType = ExchangeType.Fanout; //fanout、topic、fanout direct
                    var routingKey = "*";
                    var uri = new Uri("amqp://127.0.0.1:5672/");
                    var factory = new ConnectionFactory
                    {
                        UserName = "admin",
                        Password = "admin",
                        RequestedHeartbeat = TimeSpan.FromMilliseconds(0),
                        Endpoint = new AmqpTcpEndpoint(uri)
                    };
    
                    var connection = factory.CreateConnection();
                    var channel = connection.CreateModel();
                    //设置交换器的类型
                    channel.ExchangeDeclare(exchangeName, exchangeType);
                    //声明一个队列,设置队列是否持久化,排他性,与自动删除,名称相同不会重复创建
                    channel.QueueDeclare(queueName, true, false, false, null);
                    //绑定消息队列,交换器,routingkey
                    channel.QueueBind(queueName, exchangeName, routingKey);
                    ////输入1 如果接收一个消息,但是没有应答,客户端不会接收下一个消息
                    //prefetch_count false对于基于当前Channel创建的消费者生效
                    //prefetch_count true值在当前Channel的所有消费者共享
                    channel.BasicQos(0, 1, false);  //
    
                    Console.WriteLine("Listenging....");
    
                    System.Threading.Thread.Sleep(1000);
    
                    //int consumerCnt = 10; //消费者个数
                    for (int i = 0; i < consumerCnt; i++)
                    {
                        //定义这个队列的消费者
                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);    //QueueingBasicConsumer 已过时
    
                        // this consumer tag identifies the subscription
                        // when it has to be cancelled
                        //false为手动应答,true为自动应答
                        string consumerTag = channel.BasicConsume(queueName, false, consumer);
    
                        //Console.WriteLine($"BasicConsume....消费者:{i}....{consumerTag}");
    
                        #region MyRegion
    
                        Action<int> action = (consumerNo) =>
                        {
                            //定义这个队列的消费者
                            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);    //QueueingBasicConsumer 已过时
    
                            // this consumer tag identifies the subscription
                            // when it has to be cancelled
                            //false为手动应答,true为自动应答
                            string consumerTag = channel.BasicConsume(queueName, false, consumer);
    
                            Console.WriteLine($"BasicConsume....消费者:{consumerNo}....{consumerTag}");
                            consumer.Received += (ch, ea) =>
                            {
                                try
                                {
                                    var body = ea.Body.ToArray();
                                    // copy or deserialise the payload
                                    // and process the message
                                    var message = Encoding.UTF8.GetString(body);
    
                                    var conMessage = System.Text.Json.JsonSerializer.Deserialize<MqMessage>(message);
    
                                    Console.WriteLine($"EventingBasicConsumering....消费者:{consumerNo}....{conMessage.SendSortNo}....{conMessage.PushTime}");
                                    System.Threading.Thread.Sleep(1000);
    
                                    channel.BasicAck(ea.DeliveryTag, false);
                                    Console.WriteLine($"BasicAcking....{conMessage.SendSortNo }");
                                    //if (conMessage.SendSortNo % 2 == 1)
                                    //{
                                    //    channel.BasicAck(ea.DeliveryTag, false);
                                    //    Console.WriteLine($"BasicAcking....{conMessage.SendSortNo }");
                                    //}
    
                                    Console.WriteLine($"EventingBasicConsumering....end");
                                }
                                catch (Exception ex)
                                {
                                    Console.WriteLine($"异常BasicAcking....{ex.Message }");
                                    //重新排队所有未确认的交付
                                    channel.BasicNack(ea.DeliveryTag, true, true);
                                    //重新回到队列
                                    channel.BasicNack(ea.DeliveryTag, false, false);
                                    //重新排队交货:true,false丢弃
                                    channel.BasicReject(ea.DeliveryTag, true);
                                }
                                
                            };
    
                        };
                        Task.Run(() =>
                        {
                            action(i);
                        });
    
                        #endregion
                    }
    
    
                    //while (true)
                    //{
                    //    //阻塞函数 获取队列中的消息
                    //    BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    //    byte[] bytes = ea.Body;
                    //    var messageStr = Encoding.UTF8.GetString(bytes);
                    //    var message = System.Text.Json.JsonSerializer.Deserialize<MqMessage>(messageStr);
                    //    Console.WriteLine("Receive a Message, DateTime:" + message.DateTime.ToString("yyyy-MM-dd HH:mm:ss") + " Title:" + message.Title);
                    //    //如果是自动应答,下下面这句代码不用写啦。
                    //    if ((Convert.ToInt32(message.Title) % 2) == 1)
                    //    {
                    //        channel.BasicAck(ea.DeliveryTag, false);
                    //    }
                    //}
    
    
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
    Consumer

     简洁

            static void Main(string[] args)
            {
                var factory = new ConnectionFactory();
    
                factory.UserName = "test";
                factory.Password = "test";
                factory.VirtualHost = "/";
                factory.HostName = "127.0.0.1";
                factory.Port = 5672;
    
                IConnection conn = factory.CreateConnection();
    
                IModel channel = conn.CreateModel();
    
                channel.ExchangeDeclare("TestExchange", ExchangeType.Topic, durable: true); //定义交换机
    
                // 传输的消息
                string message = "你好,世界";
                // 指定字符串中的所有字符编码为一个字节序列
                var body = Encoding.UTF8.GetBytes(message);
               // channel.ConfirmSelect();
                // 往Test队列中发送消息
                channel.BasicPublish("TestExchange", //交换机
                    "TestRouteKey",//路由
                    null,
                    body);
                Console.WriteLine("已发送: {0}", message);
                // 关闭信道
                channel.Close();
                conn.Close();
                Console.ReadLine();
            }
    
    
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory(); //创建连接工厂
    
                factory.UserName = "test";
                factory.Password = "test";
                factory.VirtualHost = "/";
                factory.HostName = "127.0.0.1";
                factory.Port = 5672;
    
                IConnection conn = factory.CreateConnection(); //创建连接
    
                IModel channel = conn.CreateModel(); //创建通道
    
                channel.QueueDeclare("TestQueue", durable: true,exclusive:false,autoDelete:false);
                channel.QueueBind("TestQueue", //队列名称
                    "TestExchange", //交换机名称
                    "TestRouteKey", //路由名称
                    null); // 通道队列绑定交换机和路由
    
                var consumer = new EventingBasicConsumer(channel);//创建通道消费者
                channel.BasicConsume("TestQueue", 
                    false,//不自动ack
                    consumer);//通道开始基本的消费
                consumer.Received += (model, ea) => //消息接收到事件
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(message);
                    if (true) //如果处理成功 ack
                    {
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                    else // 如果处理不成功 nack 重新放回队列
                    {
    
                        channel.BasicNack(ea.DeliveryTag, false,true);
                    }
                    
                };
                Console.ReadKey();
            }
    View Code
  • 相关阅读:
    Selenium库的使用
    Win10 常用快捷键
    503.下一个更大元素 II
    456.132模式
    201.数字范围按位与
    78.子集
    299.猜数字游戏
    49.字母异位词分组
    36.有效的数独
    290.单词规律
  • 原文地址:https://www.cnblogs.com/love201314/p/15498948.html
Copyright © 2011-2022 走看看