zoukankan      html  css  js  c++  java
  • RabbitMQ 在.Net 中的使用

     

    RabbitMQHelper

     public static class RabbitMQHelper
        {
            // 定义 RabbitMQ 基本参数 
    
            private static string HostName = "127.0.0.1";
    
            private static int Port = 5672;
    
            private static string UserName = "admin";
    
            private static string Password = "admin";
    
            private static byte[] ToBytes(string str) => Encoding.UTF8.GetBytes(str);
    
            /// <summary>
            /// 设置交换机
            /// </summary>
            /// <param name="exchange">交换机名称</param>
            /// <param name="type">direct fanout headers topic</param>
            public static void SetExchange(string exchange, string type)
            {
                using (var channel = BuildChannel())
                {
                    channel.ExchangeDeclare(exchange, type, true, false);
                }
            }
    
    
            /// <summary>
            /// 设置队列
            /// </summary>
            /// <param name="queue">队列名称</param>
            /// <param name="exchange">交换机</param>
            public static void SetQueue(string queue, string exchange)
            {
                using (var channel = BuildChannel())
                {
                    //定义队列名称 持久化:是 
                    channel.QueueDeclare(queue, true, false, false, null);
    
                    // 绑定队列
                    channel.QueueBind(queue, exchange, queue, null);
    
                }
            }
    
            /// <summary>
            /// 设置队列
            /// </summary>
            /// <param name="queues">队列集合</param>
            /// <param name="exchange">交换机</param>
            public static void SetQueue(List<string> queues, string exchange)
            {
                using (var channel = BuildChannel())
                {
                    foreach (var queue in queues)
                    {
                        //定义队列名称 持久化:是 
                        channel.QueueDeclare(queue, true, false, false, null);
    
                        // 绑定队列
                        channel.QueueBind(queue, exchange, queue, null);
    
                    }
    
                }
            }
    
    
            /// <summary>
            /// 发布
            /// </summary>
            /// <param name="exchange">交换机</param>
            /// <param name="queue">队列</param>
            /// <param name="message">消息体</param>
            public static void Publish(string exchange, string queue, string message)
            {
                using (var channel = BuildChannel())
                {
                    channel.BasicPublish(exchange, queue, null, ToBytes(message));
                }
            }
    
    
            /// <summary>
            /// 订阅
            /// </summary>
            /// <param name="queue">队列</param>
            /// <param name="func">func</param>
            public static void Subscribe(string queue, Func<string, bool> func)
            {
                var channel = BuildChannel();
    
                //事件基本消费者
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    
                //接收到消息事件
                consumer.Received += (ch, ea) =>
                {
                    var message = Encoding.UTF8.GetString(ea.Body);
    
                    func.Invoke(message);
    
                        //Task.Run( () => { func.Invoke(message); });
    
                        //确认该消息已被消费
                        channel.BasicAck(ea.DeliveryTag, false);
    
                };
    
                //启动消费者 设置为手动应答消息
                channel.BasicConsume(queue, false, consumer);
    
            }
    
            private static IModel BuildChannel()
            {
                return new ConnectionFactory()
                {
                    HostName = HostName,
                    Port = Port,
                    UserName = UserName,
                    Password = Password
    
                }.CreateConnection().CreateModel();
            }
    
        }

    具体使用

    class Program
        {
            static void Main(string[] args)
            {
                //初始化交换机
                RabbitMQHelper.SetExchange("ServerDirect", "direct");
    
                // 初始化队列
                RabbitMQHelper.SetQueue("LogCenter", "ServerDirect");
                RabbitMQHelper.SetQueue("SMSCenter", "ServerDirect");
                RabbitMQHelper.SetQueue("EmailCenter", "ServerDirect");
    
                //发布消息
    
                for (int i = 0; i < 100; i++)
                {
                    RabbitMQHelper.Publish("ServerDirect", "LogCenter", "log"+i);
                }  
    
                RabbitMQHelper.Publish("ServerDirect", "SMSCenter", "sms111111111");  
    
    
                Console.ReadKey();
            }
        }
  • 相关阅读:
    旋转数组的最小数字
    Redis常用方法
    用两个栈实现队列
    Spark1.4启动spark-shell时initializing失败
    从尾到头打印链表
    Hbase的安装(hadoop-2.6.0,hbase1.0)
    执行sh文件 进行MongoDB的业务逻辑导入
    Scala第二章学习笔记
    替换空格
    二维数组中的查找
  • 原文地址:https://www.cnblogs.com/myshowtime/p/11855088.html
Copyright © 2011-2022 走看看