zoukankan      html  css  js  c++  java
  • rabbitmq系列——(4 Exchange Type -- Direct)

      收集日志后,日志级别分类,error 级别发送运维邮件需单独处理,其他记录

      直接交换机,工作方式类似于单播,Exchange会将消息发送完全匹配Routing_key的Queue;筛选消息通过key进行;

    1. 生产者

    using RabbitMQMsgProducer.MessageProducer;
    using Microsoft.Extensions.Configuration;
    using System;
    using System.IO;
    using RabbitMQMsgProducer.ExchangeDemo;
    
    namespace RabbitMQMsgProducer
    {
        class Program
        {
            static void Main(string[] args)
            {
                try
                {
                    {
                        // ExchangeType: Direct
                        // 收集日志后,日志级别分类,error 级别发送运维邮件需单独处理,其他记录
                        ProducerDirectExchange.Send();
                    }
                    Console.ReadLine();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
        }
    }
    using RabbitMQ.Client;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    
    namespace RabbitMQMsgProducer.ExchangeDemo
    {
        public class ProducerDirectExchange
        {
            public static void Send()
            {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.HostName = "localhost"; // 服务地址
                connectionFactory.UserName = "guest"; // 用户名
                connectionFactory.Password = "guest"; // 密码
                string logAllQueueName = "LogAllQueue"; // 所有日志队列
                string logErrorQueueName = "LogErrorQueue"; // 仅Error日志队列
                string exchangeName = "DirectExchange";
                using (IConnection connection = connectionFactory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        //队列声明
                        channel.QueueDeclare(queue: logAllQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queue: logErrorQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                        //路由声明
                        channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                        //队列绑定
                        string[] logTypes = new string[] { "debug", "info", "warn", "error" };
                        foreach (string logType in logTypes)
                        {
                            channel.QueueBind(queue: logAllQueueName, exchange: exchangeName, routingKey: logType, arguments: null);
                        }
                        channel.QueueBind(queue: logErrorQueueName, exchange: exchangeName, routingKey: "error", arguments: null);
    
                        Console.WriteLine("ProducerDirectExchange is ready, go.");
                        List<LogMsgModel> logMsgModels = GetLogMsgList();
                        foreach (var log in logMsgModels)
                        {
                            channel.BasicPublish(exchange: exchangeName, routingKey: log.LogType, basicProperties: null, body: log.Msg);
                            Console.WriteLine($"{Encoding.UTF8.GetString(log.Msg)} is send.");
                        }
                    }
                }
            }
            private static List<LogMsgModel> GetLogMsgList()
            {
                List<LogMsgModel> logList = new List<LogMsgModel>();
                for (int i = 1; i <= 100; i++)
                {
                    if (i % 4 == 0)
                    {
                        logList.Add(new LogMsgModel() { LogType = "info", Msg = Encoding.UTF8.GetBytes($"info第{i}条信息") });
                    }
                    if (i % 4 == 1)
                    {
                        logList.Add(new LogMsgModel() { LogType = "debug", Msg = Encoding.UTF8.GetBytes($"debug第{i}条信息") });
                    }
                    if (i % 4 == 2)
                    {
                        logList.Add(new LogMsgModel() { LogType = "warn", Msg = Encoding.UTF8.GetBytes($"warn第{i}条信息") });
                    }
                    if (i % 4 == 3)
                    {
                        logList.Add(new LogMsgModel() { LogType = "error", Msg = Encoding.UTF8.GetBytes($"error第{i}条信息") });
                    }
                }
                return logList;
            }
        }
        public class LogMsgModel
        {
            public string LogType { set; get; }
            public byte[] Msg { set; get; }
        }
    }

    2. 消费者

    // 消费者001 消费所有日志信息

    // 消费者002 消费error信息,并发送邮件给运维

    using RabbitMQMsgConsumer001.ExchangeDemo;
    using RabbitMQMsgConsumer001.MessageConsumer;
    using System;
    using System.Threading.Tasks;
    
    namespace RabbitMQMsgConsumer001
    {
        class Program
        {
            static void Main(string[] args)
            {
                try
                {
                    {
                        // ExchageType : Direct
                        // 消费者001 消费所有日志信息
                        ConsumerDirectExchange.Receive();
                    }
                    Console.ReadLine();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
        }
    }
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Text;
    
    namespace RabbitMQMsgConsumer001.ExchangeDemo
    {
        public class ConsumerDirectExchange
        {
            public static void Receive()
            {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.HostName = "localhost"; // 服务地址
                connectionFactory.UserName = "guest"; // 用户名
                connectionFactory.Password = "guest"; // 密码
                string logAllQueueName = "LogAllQueue"; // 所有日志队列
                string exchangeName = "DirectExchange";
                using (IConnection connection = connectionFactory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        //队列声明
                        channel.QueueDeclare(queue: logAllQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                        //路由声明
                        channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                        //队列绑定
                        string[] logTypes = new string[] { "debug", "info", "warn", "error" };
                        foreach (string logType in logTypes)
                        {
                            channel.QueueBind(queue: logAllQueueName, exchange: exchangeName, routingKey: logType, arguments: null);
                        }
                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var msg = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"logall:{ea.RoutingKey}, {msg} ,is receive.");
                        };
                        // 处理消息
                        channel.BasicConsume(queue: logAllQueueName, autoAck: true, consumer: consumer);
                        Console.ReadLine();
                    }
                }
            }
        }
    }
    using RabbitMQMsgConsumer002.ExchangeDemo;
    using System;
    
    namespace RabbitMQMsgConsumer002
    {
        class Program
        {
            static void Main(string[] args)
            {
                try
                {
                    {
                        // ExchageType : Direct
                        // 消费者002 消费error信息,并发送邮件给运维
                        ConsumerDirectExchange.Receive();
                    }
                    Console.ReadLine();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
        }
    }
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Text;
    
    namespace RabbitMQMsgConsumer002.ExchangeDemo
    {
        public class ConsumerDirectExchange
        {
            public static void Receive()
            {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.HostName = "localhost"; // 服务地址
                connectionFactory.UserName = "guest"; // 用户名
                connectionFactory.Password = "guest"; // 密码
                string logErrorQueueName = "LogErrorQueue"; // 所有日志队列
                string exchangeName = "DirectExchange";
                using (IConnection connection = connectionFactory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        //队列声明
                        channel.QueueDeclare(queue: logErrorQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                        //路由声明
                        channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                        //队列绑定
    
                        channel.QueueBind(queue: logErrorQueueName, exchange: exchangeName, routingKey: "error", arguments: null);
                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var msg = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"logerror:{ea.RoutingKey}, {msg} , is receive , the email is send .");
                        };
                        // 处理消息
                        channel.BasicConsume(queue: logErrorQueueName, autoAck: true, consumer: consumer);
                        Console.ReadLine();
                    }
                }
            }
        }
    }

    3. 结果

  • 相关阅读:
    iOS UILable 自定义高度 用masony适配
    iOS上架所需图片大小明细
    GCD倒计时
    iOS 小知识汇总
    七、Swift 枚举 Enumerations
    C语言深度剖析笔记
    六、闭包 Closures
    经济学常识
    Mac小技巧
    五、函数 Functions
  • 原文地址:https://www.cnblogs.com/Fletcher/p/14174375.html
Copyright © 2011-2022 走看看