zoukankan      html  css  js  c++  java
  • RabbitMQ .NET消息队列使用入门(二)【多个队列间消息传输】

    孤独将会是人生中遇见的最大困难。

    实体类:

    DocumentType.cs

      public enum DocumentType
        {
            //日志
            Journal = 1,
            //论文
            Thesis = 2,
            //会议文件
            Meeting = 3
        }
    

    MessageModel.cs

     public class MessageModel
        {
            public string Title { get; set; }
            public string Author { get; set; }
            public DocumentType DocType { get; set; }
    
            public override string ToString()
            {
                return Title;   
            }
    
            /// <summary>
            /// 验证消息,Title与Author不能为空
            /// </summary>
            /// <returns></returns>
            public bool IsVlid()
            {
                return !string.IsNullOrWhiteSpace(Title) && !string.IsNullOrWhiteSpace(Author);
            }
        }
    

    异常代码类MessageException.cs:

     public class MessageException : Exception
        {
            public MessageException(string message) : base(message)
            {
            }
    
            public MessageException(string message, Exception innerException) : base(message, innerException)
            {
            }
        }
    
        public class NoRPCConsumeException : Exception
        {
            public NoRPCConsumeException(string message) : base(message)
            {
            }
    
            public NoRPCConsumeException(string message, Exception innerException) : base(message, innerException)
            {
            }
        }
    

    ①第一个控制台应用程序

    namespace PCApp
    {
        class Program
        {
            private static IConnection _senderConnection;
            private static IModel _channel;
            private static int _interval = 1; //消息发送间隔
            private static bool isExit;
    
            private static void Main(string[] args)
            {
                Setup();
    
                Console.WriteLine("准备发送消息到extractQueue队列:");
    
                Send();
    
                WaitCommand();
            }
    
            /// <summary>
            ///     初始化
            /// </summary>
            private static void Setup()
            {
                var factory = new ConnectionFactory
                {
                    HostName = "localhost",
                    UserName = "guest",
                    Password = "guest",
                    // virtual host只是起到一个命名空间的作用,所以可以多个user共同使用一个virtual host,
                    //vritual_host= '/',这个是系统默认的,就是说当我们创建一个到rabbitmq的connection时候,它的命名空间是'/',需要注意的是不同的命名空间之间的资源是不能访问的,比如 exchang,queue ,bingding等
                    //VirtualHost = "test",
                    AutomaticRecoveryEnabled = true,
                    TopologyRecoveryEnabled = true
                };
    
                try
                {
                    _senderConnection = factory.CreateConnection();
                    _senderConnection.ConnectionShutdown += _senderConnection_ConnectionShutdown;
    
                    _channel = _senderConnection.CreateModel();
                    _channel.QueueDeclare("extractQueue", false, false, false, null);
                }
                catch (BrokerUnreachableException ex)
                {
                    Console.WriteLine("ERROR: RabbitMQ服务器未启动!");
                    Thread.Sleep(2000);
                    isExit = true;
                }
            }
    
            private static void _senderConnection_ConnectionShutdown(object sender, ShutdownEventArgs e)
            {
                Console.WriteLine("连接已关闭. " + e.ReplyText);
            }
    
            /// <summary>
            /// 等待接收指令
            /// </summary>
            private static void WaitCommand()
            {
                
                while (!isExit)
                {
                    string line = Console.ReadLine().ToLower().Trim();
                    string[] arr = line.Split(new[] {' '});
                    string cmd = arr[0];
                    switch (cmd)
                    {
                        case "exit":
                            Close();
                            isExit = true;
                            break;
                        case "go":
                            int count = 10;
                            if (arr.Length > 1)
                            {
                                int.TryParse(arr[1], out count);
                            }
    
                            Send(count);
                            break;
                        case "interval":
                            int.TryParse(arr[1], out _interval);
                            break;
                        case "clear":
                            Console.Clear();
                            break;
                        default:
                            break;
                    }
                }
    
                Console.WriteLine("Goodbye!");
            }
    
            public static void Send(int msgCount = 10)
            {
              
                Console.WriteLine("---------- 开始发送------------");
    
                for (int i = 1; i <= msgCount; i++)
                {
                    string title = "测试文档" + i;
                    string author = "lexworld" + i;
                    int docType = i%2 + 1;
                    string jsonFormat = "{{"Title":"{0}","Author":"{1}","DocType":{2}}}";
                    string message = string.Format(jsonFormat, title, author, docType);
                    byte[] body = Encoding.UTF8.GetBytes(message);
                    try
                    {
                        _channel.BasicPublish("", "extractQueue", null, body);
                    }
                    catch (AlreadyClosedException ex)
                    {
                        Console.WriteLine("ERROR: " + ex.Message);
                        break;
                    }
    
                    Console.WriteLine("Time:" + DateTime.Now + " MSG:" + title);
    
                    if (_interval > 0)
                    {
                        Thread.Sleep(_interval*1000);
                    }
                }
    
                Console.WriteLine("---------- 结束 ------------");
            }
    
            private static string GetMessage()
            {
                string argLine = string.Join(" ", Environment.GetCommandLineArgs());
                string args = argLine.Substring(argLine.IndexOf(" ") + 1);
                Console.WriteLine("args:" + args);
                string[] arr = args.Split(new[] {','});
                string jsonFormat = "{{"Title":"{0}","Author":"{1}","DocType":{2}}}";
    
                return string.Format(jsonFormat, arr[0], arr[1], arr[2]);
            }
    
            private static void Close()
            {
                if (_channel != null && _channel.IsOpen)
                {
                    _channel.Close();
                }
    
                if (_senderConnection != null && _senderConnection.IsOpen)
                {
                    _senderConnection.Close();
                }
            }
        }
    }
    

    ②第二个控制台应用程序

      class Program
        {
            private static IConnection _senderConn;
            private static IConnection _recvConn;
            //private static IModel _senderChannel; 多线程情况下,每个线程需要独立的channel来发送消息
            private static IModel _recvChannel;
            private static bool isExit;
    
    
            private static void Main(string[] args)
            {
                Setup();
    
                Console.WriteLine("开始消费extractQueue队列里的消息(同时推送到checkQueue检查队列):");
    
                WaitCommand();
            }
    
            /// <summary>
            ///  初始化
            /// </summary>
            private static void Setup()
            {
                var factory = new ConnectionFactory
                {
                    HostName = "localhost",
                    UserName = "guest",
                    Password = "guest",
                    //VirtualHost = "test",
                    //TopologyRecoveryEnabled = true,   //默认为true,如果设置为false,则重连后不会重建相关实体,如:exchange,queue,binding
                    AutomaticRecoveryEnabled = true //自动重连
                };
    
                try
                {
                    _recvConn = factory.CreateConnection();
                    _recvConn.ConnectionShutdown += ConnectionShutdown;
                    _recvChannel = _recvConn.CreateModel();
                    _recvChannel.QueueDeclare("extractQueue", false, false, false, null);
                    _recvChannel.BasicQos(0, 10, false);
                    var consumer = new EventingBasicConsumer(_recvChannel);
                    consumer.Received += consumer_Received;
                    _recvChannel.BasicConsume("extractQueue", false, consumer);
    
    
                    _senderConn = factory.CreateConnection();
                    IModel channel = _senderConn.CreateModel();
                    channel.QueueDeclare("checkQueue", false, false, false, null);
                    //channel.Close();  
                    //这里如果关闭channel的话,自动重连的时候无法恢复checkQueue队列,因为checkQueue是使用channel创建的,恢复的时候还要使用channel,必须保持该信道不关闭
                }
                catch (BrokerUnreachableException ex)
                {
                    Console.WriteLine("ERROR: RabbitMQ服务器未启动!");
                    Thread.Sleep(2000);
                    isExit = true;
                }
            }
    
            private static void ConnectionShutdown(object sender, ShutdownEventArgs e)
            {
                Console.WriteLine("Connection has already closed.");
            }
    
            /// <summary>
            /// 等待接收指令
            /// </summary>
            private static void WaitCommand()
            {
                while (!isExit)
                {
                    string line = Console.ReadLine().ToLower().Trim();
                    switch (line)
                    {
                        case "exit":
                            Close();
                            isExit = true;
                            break;
                        case "clear":
                            Console.Clear();
                            break;
                        default:
                            break;
                    }
                }
    
                Console.WriteLine("Goodbye!");
            }
    
            private static void Close()
            {
                if (_recvChannel != null && _recvChannel.IsOpen)
                {
                    _recvChannel.Close();
                }
    
                if (_recvConn != null && _recvConn.IsOpen)
                {
                    _recvConn.Close();
                }
    
                if (_senderConn != null && _senderConn.IsOpen)
                {
                    _senderConn.Close();
                }
            }
    
            #region 异步消息处理,客户端发送完消息后不再等待
    
            /// <summary>
            /// 消息接收处理事件,多线程处理消息
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private static void consumer_Received(object sender, BasicDeliverEventArgs e)
            {
                byte[] body = e.Body;
                //bool isSuccess = false;
    
                Task.Run(() => HandlingMessage(body, e));
            }
    
            /// <summary>
            /// 消息处理
            /// </summary>
            /// <param name="msgModel"></param>
            /// <param name="e"></param>
            private static async void HandlingMessage(byte[] body, BasicDeliverEventArgs e)
            {
                bool isSuccess = false;
                string message = Encoding.UTF8.GetString(body);
                IModel _senderChannel = _senderConn.CreateModel(); //多线程中每个线程使用独立的信道
    
                try
                {
                    var msgModel = JsonConvert.DeserializeObject<MessageModel>(message);
                    if (msgModel == null || !msgModel.IsVlid()) //解析失败或消息格式不正确,拒绝处理
                    {
                        throw new MessageException("消息解析失败");
                    }
    
                    var random = new Random();
                    int num = random.Next(0, 4);
    
                    //模拟处理失败
                    if (random.Next(0, 11) == 4)
                    {
                        throw new Exception("处理失败", null);
                    }
    
                    //模拟解析失败
                    if (random.Next(0, 11) == 8)
                    {
                        throw new MessageException("消息解析失败");
                    }
    
                    await Task.Delay(num*1000);
    
                    //这里简单处理,仅格式化输出消息内容
                    Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId + " Used: " + num + "s MSG:" + msgModel);
    
                    isSuccess = true;
                }
                catch (MessageException msgEx)
                {
                    Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId + " ERROR:" + msgEx.Message + " MSG:" + message);
                    _recvChannel.BasicReject(e.DeliveryTag, false); //不再重新分发
                    return;
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId + " ERROR:" + ex.Message + " MSG:" + message);
                }
    
                if (isSuccess)
                {
                    try
                    {
                        _senderChannel.BasicPublish("", "checkQueue", null, body); //发送消息到内容检查队列
                        _recvChannel.BasicAck(e.DeliveryTag, false); //确认处理成功
                    }
                    catch (AlreadyClosedException acEx)
                    {
                        Console.WriteLine("ERROR:连接已关闭");
                    }
                }
                else
                {
                    _recvChannel.BasicReject(e.DeliveryTag, true); //处理失败,重新分发
                }
    
                _senderChannel.Close();
            }
    
            #endregion
    
            #region 同步消息处理(RPC)
    
            #endregion
        }
    

    ③第三个控制台应用程序

       class Program
        {
            private static IConnection _recvConn;
            private static IConnection _senderConn;
            private static IModel _recvChannel;
            private static bool isExit;
    
            private static void Main(string[] args)
            {
                Setup();
    
                Console.WriteLine("开始使用checkQueue队列里的消息(同时推送到reportQueue报告队列):");
    
                WaitCommand();
            }
    
            /// <summary>
            ///     初始化
            /// </summary>
            private static void Setup()
            {
                var factory = new ConnectionFactory
                {
                    HostName = "localhost",
                    UserName = "guest",
                    Password = "guest",
                    //VirtualHost = "test",
                    TopologyRecoveryEnabled = true,
                    AutomaticRecoveryEnabled = true
                };
    
                try
                {
                    _recvConn = factory.CreateConnection();
                    _recvConn.ConnectionShutdown += ConnectionShutdown;
                    _recvChannel = _recvConn.CreateModel();
                    _recvChannel.QueueDeclare("checkQueue", false, false, false, null);
                    _recvChannel.BasicQos(0, 10, false);
                    var consumer = new EventingBasicConsumer(_recvChannel);
                    consumer.Received += consumer_Received;
                    _recvChannel.BasicConsume("checkQueue", false, consumer);
    
                    _senderConn = factory.CreateConnection();
                    IModel channel = _senderConn.CreateModel();
                    channel.QueueDeclare("reportQueue", false, false, false, null);
                    //channel.Close();  //这里如果关闭channel的话,自动重连的时候无法恢复reportQueue队列,
                    //因为reportQueue是使用channel创建的,恢复的时候还要使用channel,必须保持该信道不关闭
                }
                catch (BrokerUnreachableException ex)
                {
                    Console.WriteLine("ERROR: RabbitMQ服务器未启动!");
                    Thread.Sleep(2000);
                    isExit = true;
                }
            }
    
            private static void ConnectionShutdown(object sender, ShutdownEventArgs e)
            {
                Console.WriteLine("连接已关闭.");
            }
    
            /// <summary>
            /// 等待接收指令
            /// </summary>
            private static void WaitCommand()
            {
                while (!isExit)
                {
                    string line = Console.ReadLine().ToLower().Trim();
                    switch (line)
                    {
                        case "exit":
                            Close();
                            isExit = true;
                            break;
                        case "clear":
                            Console.Clear();
                            break;
                        default:
                            break;
                    }
                }
    
                Console.WriteLine("Goodbye!");
            }
    
            private static void Close()
            {
                if (_recvChannel != null && _recvChannel.IsOpen)
                {
                    _recvChannel.Close();
                }
    
                if (_recvConn != null && _recvConn.IsOpen)
                {
                    _recvConn.Close();
                }
    
                if (_senderConn != null && _senderConn.IsOpen)
                {
                    _senderConn.Close();
                }
            }
    
            #region 异步消息处理,客户端发送完消息后不再等待
    
            /// <summary>
            ///  消息接收处理事件,多线程处理消息
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private static void consumer_Received(object sender, BasicDeliverEventArgs e)
            {
                byte[] body = e.Body;
    
                Task.Run(() => HandlingMessage(body, e));
            }
    
            /// <summary>
            ///  消息处理
            /// </summary>
            /// <param name="msgModel"></param>
            /// <param name="e"></param>
            private static async void HandlingMessage(byte[] body, BasicDeliverEventArgs e)
            {
                bool isSuccess = false;
                string message = Encoding.UTF8.GetString(body);
                IModel _senderChannel = _senderConn.CreateModel(); //多线程中每个线程使用独立的信道
    
                try
                {
                    var msgModel = JsonConvert.DeserializeObject<MessageModel>(message);
                    if (msgModel == null || !msgModel.IsVlid()) //解析失败或消息格式不正确,拒绝处理
                    {
                        throw new MessageException("消息解析失败");
                    }
    
                    var random = new Random();
                    int num = random.Next(0, 4);
    
                    //模拟处理失败
                    if (random.Next(0, 11) == 4)
                    {
                        throw new Exception("处理失败", null);
                    }
    
                    //模拟解析失败
                    if (random.Next(0, 11) == 8)
                    {
                        throw new MessageException("消息解析失败");
                    }
    
                    await Task.Delay(num*1000);
    
                    //这里简单处理,仅格式化输出消息内容
                    Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId +
                                      " Used: " + num + "s MSG:" + msgModel);
    
                    isSuccess = true;
                }
                catch (MessageException msgEx)
                {
                    Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId + " ERROR:" + msgEx.Message + " MSG:" + message);
                    _recvChannel.BasicReject(e.DeliveryTag, false); //不再重新分发
                    return;
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId + " ERROR:" + ex.Message + " MSG:" + message);
                }
    
                if (isSuccess)
                {
                    try
                    {
                        _senderChannel.BasicPublish("", "reportQueue", null, body); //发送消息到内容检查队列
                        _recvChannel.BasicAck(e.DeliveryTag, false); //确认处理成功
                    }
                    catch (AlreadyClosedException acEx)
                    {
                        Console.WriteLine("ERROR:连接已关闭");
                    }
                }
                else
                {
                    _recvChannel.BasicReject(e.DeliveryTag, true); //处理失败,重新分发
                }
    
                _senderChannel.Close();
            }
    
            #endregion
        }
    

    ④第四个控制台应用程序

     class Program
        {
            private static IConnection _recvConn;
            private static IModel _recvChannel;
            private static bool isExit;
            //private static IConnection _receiverConn; //同步处理(RPC)时使用
    
            private static void Main(string[] args)
            {
                Setup();
    
                Console.WriteLine("开始消费reportQueue队列里的消息:");
    
                WaitCommand();
            }
    
            /// <summary>
            ///  初始化
            /// </summary>
            private static void Setup()
            {
                var factory = new ConnectionFactory
                {
                    HostName = "localhost",
                    UserName = "test",
                    Password = "test",
                    VirtualHost = "test",
                    TopologyRecoveryEnabled = true,
                    AutomaticRecoveryEnabled = true
                };
    
                try
                {
                    _recvConn = factory.CreateConnection();
                    _recvConn.ConnectionShutdown += ConnectionShutdown;
                    _recvChannel = _recvConn.CreateModel();
                    _recvChannel.QueueDeclare("reportQueue", false, false, false, null);
                    _recvChannel.BasicQos(0, 10, false);
                    var consumer = new EventingBasicConsumer(_recvChannel);
                    consumer.Received += consumer_Received;
                    _recvChannel.BasicConsume("reportQueue", false, consumer);
                }
                catch (BrokerUnreachableException ex)
                {
                    Console.WriteLine("ERROR: RabbitMQ服务器未启动!");
                    Thread.Sleep(2000);
                    isExit = true;
                }
            }
    
            private static void ConnectionShutdown(object sender, ShutdownEventArgs e)
            {
                Console.WriteLine("连接已关闭.");
            }
    
            /// <summary>
            ///  等待接收指令
            /// </summary>
            private static void WaitCommand()
            {
                while (!isExit)
                {
                    string line = Console.ReadLine().ToLower().Trim();
                    switch (line)
                    {
                        case "exit":
                            Close();
                            isExit = true;
                            break;
                        case "clear":
                            Console.Clear();
                            break;
                        default:
                            break;
                    }
                }
    
                Console.WriteLine("Goodbye!");
            }
    
            private static void Close()
            {
                if (_recvChannel != null && _recvChannel.IsOpen)
                {
                    _recvChannel.Close();
                }
    
                if (_recvConn != null && _recvConn.IsOpen)
                {
                    _recvConn.Close();
                }
            }
    
            #region 异步消息处理,客户端发送完消息后不再等待
    
            /// <summary>
            /// 消息接收处理事件,多线程处理消息
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private static void consumer_Received(object sender, BasicDeliverEventArgs e)
            {
                byte[] body = e.Body;
                
                Task.Run(() => HandlingMessage(body, e));
            }
    
            /// <summary>
            ///  消息处理
            /// </summary>
            /// <param name="msgModel"></param>
            /// <param name="e"></param>
            private static async void HandlingMessage(byte[] body, BasicDeliverEventArgs e)
            {
                bool isSuccess = false;
                string message = Encoding.UTF8.GetString(body);
    
                try
                {
                    var msgModel = JsonConvert.DeserializeObject<MessageModel>(message);
                    if (msgModel == null || !msgModel.IsVlid()) //解析失败或消息格式不正确,拒绝处理
                    {
                        throw new MessageException("消息解析失败");
                    }
    
                    var random = new Random();
                    int num = random.Next(0, 4);
    
                    //模拟处理失败
                    if (random.Next(0, 11) == 4)
                    {
                        throw new Exception("处理失败", null);
                    }
    
                    //模拟解析失败
                    if (random.Next(0, 11) == 8)
                    {
                        throw new MessageException("消息解析失败");
                    }
    
                    await Task.Delay(num*1000);
    
                    //这里简单处理,仅格式化输出消息内容
                    Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId +
                                      " Used: " + num + "s MSG:" + msgModel);
    
                    isSuccess = true;
                }
                catch (MessageException msgEx)
                {
                    Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId +
                                      " ERROR:" + msgEx.Message + " MSG:" + message);
                    _recvChannel.BasicReject(e.DeliveryTag, false); //不再重新分发
                    return;
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId +
                                      " ERROR:" + ex.Message + " MSG:" + message);
                }
    
                if (isSuccess)
                {
                    try
                    {
                        _recvChannel.BasicAck(e.DeliveryTag, false); //确认处理成功
                    }
                    catch (AlreadyClosedException acEx)
                    {
                        Console.WriteLine("ERROR:连接已关闭");
                    }
                }
                else
                {
                    _recvChannel.BasicReject(e.DeliveryTag, true); //处理失败,重新分发
                }
            }
    
            #endregion
        }
    

    运行结果如图:

    这里写图片描述

    这里写图片描述

    这里写图片描述

    这里写图片描述

  • 相关阅读:
    SQL 生成可配置流水号
    安卓程序进入后台和前台的判断
    Android代码故事第一回,平均间隔的按钮
    安卓冷知识:LayoutParams
    初识Android NDK
    搬家
    LaTeX表格紧跟文字 (不影响下方文本对齐)
    FlagCounter被封杀?自己实现一个简单的多国访客计数器
    Python+OpenCV竖版古籍文字分割
    Ubuntu18.04 显卡驱动+Cuda安装踩坑记录 以及Ubuntu虚拟内存的添加
  • 原文地址:https://www.cnblogs.com/Wulex/p/6965043.html
Copyright © 2011-2022 走看看