zoukankan      html  css  js  c++  java
  • RabbitMQ Direct交换机代码实现

    RabbitMQ交换机有四种类型:Direct,Fanout,Topic,Header

    先简单介绍Direct交换机的代码实现

    先创建连接

    public class MQHelper
        {
            public IConnection GetConnection()
            {
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";
                factory.UserName = "guest";
                factory.Password = "guest";
                return factory.CreateConnection();
            }
        }

    直流交换机代码:

    public class DirectExchange
        {      
            public void DirectPublish()
            {
                MQHelper mh = new MQHelper();
                using (var conn = mh.GetConnection())
                {
                    using(IModel channel = conn.CreateModel())
                    {
                        //声明队列
                        channel.QueueDeclare(queue: "DirectLogAll", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queue: "DirectLogError", durable: true, exclusive: false, autoDelete: false, arguments: null);
    
                        //声明交换机
                        channel.ExchangeDeclare(exchange: "DirectExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
    
                        //日志类型
                        string[] logtypes = new string[] { "debug", "info", "warn", "error" };
    
                        foreach(string log in logtypes)
                        {
                            //绑定交换机和队列(所有日志类型队列)
                            channel.QueueBind(queue: "DirectLogAll", exchange: "DirectExchange", routingKey: log);
                        }
    
                        //错误日志队列
                        channel.QueueBind(queue: "DirectLogError", exchange: "DirectExchange", routingKey: "error");
    
    
                        List<LogModel> list = new List<LogModel>();
                        //100条消息
                        for(int i = 1; i <= 100; i++)
                        {
                            if (i % 4 == 0)
                            {
                                list.Add(new LogModel() { LogType = "debug", Msg = Encoding.UTF8.GetBytes($"debug第{i}条消息") });
                            }else if(i % 4 == 1)
                            {
                                list.Add(new LogModel() { LogType = "info", Msg = Encoding.UTF8.GetBytes($"info第{i}条消息") });
                            }
                            else if (i % 4 == 2)
                            {
                                list.Add(new LogModel() { LogType = "warn", Msg = Encoding.UTF8.GetBytes($"warn第{i}条消息") });
                            }
                            else
                            {
                                list.Add(new LogModel() { LogType = "error", Msg = Encoding.UTF8.GetBytes($"error第{i}条消息") });
                            }
                        }
    
                        //发送
                        foreach(var log in list)
                        {
                            channel.BasicPublish(exchange: "DirectExchange", routingKey: log.LogType, basicProperties: null, body: log.Msg);
                            //记录
                            Console.WriteLine($"{Encoding.UTF8.GetString(log.Msg)} 已发送");
                        }
    
                    }
                }
            }  
        }

    这是模拟发送100条日志信息到RabbitMQ,四种类型各25条,根据路由键routingKey分别传到两条队列。其中日志消息

    public class LogModel
        {
            public string LogType { get; set; }     //日志类型
    
            public byte[] Msg { get; set; }        //消息正文
        }

    最终在RabbitMQ的本地localhost:15672上可以看到

    这是发送消息到了RabbitMQ上待消费。

    添加Direct消费模块

    public class DirectExchangeConsumer
        {
            public void DirectConsume()
            {
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";
                factory.UserName = "guest";
                factory.Password = "guest";
                
                using(var conn= factory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        //声明队列
                        channel.QueueDeclare(queue: "DirectLogAll", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        // channel.QueueDeclare(queue: "DirectLogError", durable: true, exclusive: false, autoDelete: false, arguments: null);
    
                        //声明交换机
                        channel.ExchangeDeclare(exchange: "DirectExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
    
                        //日志类型
                        string[] logtypes = new string[] { "debug", "info", "warn", "error" };
    
                        foreach (string log in logtypes)
                        {
                            //绑定交换机和队列(所有日志类型队列)
                            channel.QueueBind(queue: "DirectLogAll", exchange: "DirectExchange", routingKey: log);
                        }
    
                        //错误日志队列
                        // channel.QueueBind(queue: "DirectLogError", exchange: "DirectExchange", routingKey: "error");
    
    
                        //消费消息
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"[{message}],写入文本");
                        };
                        //处理消息
                        channel.BasicConsume(queue: "DirectLogAll", autoAck: true, consumer: consumer);
                    }
                }
            }
        }

    虽然这里重复声明了交换机和队列,但由于连接一样,所以不会重复存在。

    调用方法之后可以看到消息全部都被消费

    记录编程的点滴,体会学习的乐趣
  • 相关阅读:
    [leetCode]剑指 Offer 43. 1~n整数中1出现的次数
    [leetCode]剑指 Offer 42. 连续子数组的最大和
    HDU
    HDU
    HDU
    HDU
    HDU
    HDU
    POJ
    POJ
  • 原文地址:https://www.cnblogs.com/AduBlog/p/14891574.html
Copyright © 2011-2022 走看看