zoukankan      html  css  js  c++  java
  • RabbitMQ实战经验分享

    前言

    最近在忙一个高考项目,看着系统顺利完成了这次高考,终于可以松口气了。看到那些即将参加高考的学生,也想起当年高三的自己。

    下面分享下RabbitMQ实战经验,希望对大家有所帮助:


     

    一、生产消息

    关于RabbitMQ的基础使用,这里不再介绍了,项目中使用的是Exchange中的topic模式。

    先上发消息的代码

    private bool MarkErrorSend(string[] lstMsg)
            {
                try
                {
                    var factory = new ConnectionFactory()
                    {
                        UserName = "guest",//用户名
                        Password = "guest",//密码
                        HostName = "localhost",//ConfigurationManager.AppSettings["sHostName"],
                    };
                    //创建连接
                    var connection = factory.CreateConnection();
                    //创建通道
                    var channel = connection.CreateModel();
                    try
                    {
                        //定义一个Direct类型交换机
                        channel.ExchangeDeclare(
                            exchange: "TestTopicChange", //exchange名称
                            type: ExchangeType.Topic, //Topic模式,采用路由匹配
                            durable: true,//exchange持久化
                            autoDelete: false,//是否自动删除,一般设成false
                            arguments: null//一些结构化参数,比如:alternate-exchange
                            );
    
                        //定义测试队列
                        channel.QueueDeclare(
                            queue: "Test_Queue", //队列名称
                            durable: true, //队列磁盘持久化(要和消息持久化一起使用才有效)
                            exclusive: false,//是否排他的,false。如果一个队列声明为排他队列,该队列首次声明它的连接可见,并在连接断开时自动删除
                            autoDelete: false,//是否自动删除,一般设成false
                            arguments: null
                            );
    
                        //将队列绑定到交换机
                        string routeKey = "TestRouteKey.*";//*匹配一个单词
                        channel.QueueBind(
                            queue: "Test_Queue",
                            exchange: "TestTopicChange",
                            routingKey: routeKey,
                            arguments: null
                            );
    
                        //消息磁盘持久化,把DeliveryMode设成2(要和队列持久化一起使用才有效)
                        IBasicProperties properties = channel.CreateBasicProperties();
                        properties.DeliveryMode = 2;
    
                        channel.ConfirmSelect();//发送确认机制
                        foreach (var itemMsg in lstMsg)
                        {
                            byte[] sendBytes = Encoding.UTF8.GetBytes(itemMsg);
                            //发布消息
                            channel.BasicPublish(
                                exchange: "TestTopicChange",
                                routingKey: "TestRouteKey.one",
                                basicProperties: properties,
                                body: sendBytes
                                );
                        }
                        bool isAllPublished = channel.WaitForConfirms();//通道(channel)里所有消息均发送才返回true
                        return isAllPublished;
                    }
                    catch (Exception ex)
                    {
                        //写错误日志
                        return false;
                    }
                    finally
                    {
                        channel.Close();
                        connection.Close();
                    }
                }
                catch
                {
                    //RabbitMQ.Client.Exceptions.BrokerUnreachableException:
                    //When the configured hostname was not reachable.
                    return false;
                }
            }

            发消息没啥特别的。关于消息持久化的介绍这里也不再介绍,不懂的可以看上篇文章。发消息需要注意的地方是,可以选择多条消息一起发送,最后才确定消息发送成功,这样效率比较高;此外,需要尽量精简每条消息的长度(楼主在这里吃过亏),不然会因消息过长从而增加发送时间。在实际项目中一次发了4万多条数据没有出现问题。


    二、接收消息

           接下来说下消费消息的过程,我使用的是单个连接多个channel,每个channel每次只取一条消息方法。有人会问单个TCP连接,多个channel会不会影响通信效率。这个理论上肯定会有影响的,看影响大不大而已。我开的channel数一般去到30左右,并没有觉得影响效率,有可能是因为我每个channel是拿一条消息的原因。通过单个连接多个channel的方法,可以少开了很多连接。至于我为什么选每个channel每次只取一条消息,这是外界因素限制了,具体看自己需求。

           接下接收消息的过程,首先定义一个RabbitMQHelper类,里面有个全局的conn连接变量,此外还有创建连接、关闭连接和验证连接是否打开等方法。程序运行一个定时器,当

    检测到连接未打开的情况下,主动创建连接处理消息。

     public class RabbitMQHelper
        {
            public IConnection conn = null;
    
            /// <summary>
            /// 创建RabbitMQ消息中间件连接
            /// </summary>
            /// <returns>返回连接对象</returns>
            public IConnection RabbitConnection(string sHostName, ushort nChannelMax)
            {
                try
                {
                    if (conn == null)
                    {
                        var factory = new ConnectionFactory()
                        {
                            UserName = "guest",//用户名
                            Password = "guest",//密码
                            HostName = sHostName,//ConfigurationManager.AppSettings["MQIP"],
                            AutomaticRecoveryEnabled = false,//取消自动重连,改用定时器定时检测连接是否存在
                            RequestedConnectionTimeout = 10000,//请求超时时间设成10秒,默认的为30秒
                            RequestedChannelMax = nChannelMax//与开的线程数保持一致
                        };
                        //创建连接
                        conn = factory.CreateConnection();
                        Console.WriteLine("RabbitMQ连接已创建!");
                    }
    
                    return conn;
                }
                catch
                {
                    Console.WriteLine("创建连接失败,请检查RabbitMQ是否正常运行!");
                    return null;
                }
            }
    
            /// <summary>
            /// 关闭RabbitMQ连接
            /// </summary>
            public void Close()
            {
                try
                {
                    if (conn != null)
                    {
                        if (conn.IsOpen)
                            conn.Close();
                        conn = null;
                        Console.WriteLine("RabbitMQ连接已关闭!");
                    }
                }
                catch { }
            }
    
            /// <summary>
            /// 判断RabbitMQ连接是否打开
            /// </summary>
            /// <returns></returns>
            public bool IsOpen()
            {
                try
                {
                    if (conn != null)
                    {
                        if (conn.IsOpen)
                            return true;
                    }
                    return false;
                }
                catch
                {
                    return false;
                }
            }
        }

           接下来我们看具体如何接收消息。

    private static AutoResetEvent myEvent = new AutoResetEvent(false);
    private RabbitMQHelper rabbit = new RabbitMQHelper();
    private ushort nChannel = 10;//一个连接的最大通道数和所开的线程数一致

           首先初始化一个rabbit实例,然后通过RabbitConnection方法创建RabbitMQ连接。

           当连接打开时候,用线程池运行接收消息的方法。注意了,这里开的线程必须和开的channel数量一致,不然会有问题(具体问题是,设了RabbitMQ连接超时时间为10秒,有时候不管用,原因未查明。RabbitMQ创建连接默认超时时间为30秒,假如在这个时间内再去调用创建的话,就有可能得到两倍的channel;)

    /// <summary>
            /// 单个RabbitMQ连接开多个线程,每个线程开一个channel接受消息
            /// </summary>
            private void CreateConnecttion()
            {
                try
                {
                    rabbit.RabbitConnection("localhost", nChannel);
                    if (rabbit.conn != null)
                    {
                        ThreadPool.SetMinThreads(1, 1);
                        ThreadPool.SetMaxThreads(100, 100);
                        for (int i = 1; i <= nChannel; i++)
                        {
                            ThreadPool.QueueUserWorkItem(new WaitCallback(ReceiveMsg), "");
                        }
                        myEvent.WaitOne();//等待所有线程工作完成后,才能关闭连接
                        rabbit.Close();
                    }
                }
                catch (Exception ex)
                {
                    rabbit.Close();
                    Console.WriteLine(ex.Message);
                }
            }

           接着就是接收消息的方法,处理消息的过程省略了。

      /// <summary>
            /// 接收并处理消息,在一个连接中创建多个通道(channel),避免创建多个连接
            /// </summary>
            /// <param name="con">RabbitMQ连接</param>
            private void ReceiveMsg(object obj)
            {
                IModel channel = null;
                try
                {
                    #region 创建通道,定义中转站和队列
                    channel = rabbit.conn.CreateModel();
                    channel.ExchangeDeclare(
                        exchange: "TestTopicChange", //exchange名称
                        type: ExchangeType.Topic, //Topic模式,采用路由匹配
                        durable: true,//exchange持久化
                        autoDelete: false,//是否自动删除,一般设成false
                        arguments: null//一些结构化参数,比如:alternate-exchange
                        );
    
                    //定义阅卷队列
                    channel.QueueDeclare(
                        queue: "Test_Queue", //队列名称
                        durable: true, //队列磁盘持久化(要和消息持久化一起使用才有效)
                        exclusive: false,//是否排他的,false。如果一个队列声明为排他队列,该队列首次声明它的连接可见,并在连接断开时自动删除
                        autoDelete: false,
                        arguments: null
                        );
                    #endregion
                    channel.BasicQos(0, 1, false);//每次只接收一条消息
    
                    channel.QueueBind(queue: "Test_Queue",
                                          exchange: "TestTopicChange",
                                          routingKey: "TestRouteKey.*");
                    var consumer = new EventingBasicConsumer(channel);
    
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        var routingKey = ea.RoutingKey;
                        //处理消息方法
                        try
                        {
                            bool isMark = AutoMark(message);
                            if (isMark)
                            {
                                //Function.writeMarkLog(message);
                                //确认该消息已被消费,发消息给RabbitMQ队列
                                channel.BasicAck(ea.DeliveryTag, false);
                            }
                            else
                            {
                                if (MarkErrorSend(message))//把错误消息推到错误消息队列
                                    channel.BasicReject(ea.DeliveryTag, false);
                                else
                                    //消费消息失败,拒绝此消息,重回队列,让它可以继续发送到其他消费者 
                                    channel.BasicReject(ea.DeliveryTag, true);
                            }
                        }
                        catch (Exception ex)
                        {
                            try
                            {
                                Console.WriteLine(ex.Message);
                                if (channel != null && channel.IsOpen)//处理RabbitMQ停止重启而自动评阅崩溃的问题
                                {
                                    //消费消息失败,拒绝此消息,重回队列,让它可以继续发送到其他消费者 
                                    channel.BasicReject(ea.DeliveryTag, true);
                                }
                            }
                            catch { }
                        }
                    };
                    //手动确认消息
                    channel.BasicConsume(queue: "Test_Queue",
                                         autoAck: false,
                                         consumer: consumer);
                }
                catch (Exception ex)
                {
                    try
                    {
                        Console.WriteLine("接收消息方法出错:" + ex.Message);
                        if (channel != null && channel.IsOpen)//关闭通道
                            channel.Close();
                        if (rabbit.conn != null)//处理RabbitMQ突然停止的问题
                            rabbit.Close();
                    }
                    catch { }
                }
            }

     

     

    三、处理错误消息

           把处理失败的消息放到“错误队列”,然后把原队列的消息删除(这里主要解决问题是,存在多个处理失败或处理不了的消息时,如果把这些消息都放回原队列,它们会继续分发到其他线程的channel,但结果还是处理不了,就会造成一个死循环,导致后面的消息无法处理)。把第一次处理不了的消息放到“错误队列”后,重新再开一个新的连接去处理“错误队列”的消息。

    /// <summary>
            /// 把处理错误的消息发送到“错误消息队列”
            /// </summary>
            /// <param name="msg"></param>
            /// <returns></returns>
            private bool MarkErrorSend(string msg)
            {
                RabbitMQHelper MQ = new RabbitMQHelper();
                MQ.RabbitConnection("localhost",1);
                //创建通道
                var channel = MQ.conn.CreateModel();
                try
                {
                    //定义一个Direct类型交换机
                    channel.ExchangeDeclare(
                        exchange: "ErrorTopicChange", //exchange名称
                        type: ExchangeType.Topic, //Topic模式,采用路由匹配
                        durable: true,//exchange持久化
                        autoDelete: false,//是否自动删除,一般设成false
                        arguments: null//一些结构化参数,比如:alternate-exchange
                        );
    
                    //定义阅卷队列
                    channel.QueueDeclare(
                        queue: "Error_Queue", //队列名称
                        durable: true, //队列磁盘持久化(要和消息持久化一起使用才有效)
                        exclusive: false,//是否排他的,false。如果一个队列声明为排他队列,该队列首次声明它的连接可见,并在连接断开时自动删除
                        autoDelete: false,//是否自动删除,一般设成false
                        arguments: null
                        );
    
                    //将队列绑定到交换机
                    string routeKey = "ErrorRouteKey.*";//*匹配一个单词
                    channel.QueueBind(
                        queue: "Error_Queue",
                        exchange: "ErrorTopicChange",
                        routingKey: routeKey,
                        arguments: null
                        );
    
                    //消息磁盘持久化,把DeliveryMode设成2(要和队列持久化一起使用才有效)
                    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2;
    
                    channel.ConfirmSelect();//发送确认机制
                    byte[] sendBytes = Encoding.UTF8.GetBytes(msg);
                    //发布消息
                    channel.BasicPublish(
                        exchange: "ErrorTopicChange",
                        routingKey: "ErrorRouteKey.one",
                        basicProperties: properties,
                        body: sendBytes
                        );
    
                    bool isAllPublished = channel.WaitForConfirms();//通道(channel)里所有消息均发送才返回true
                    return isAllPublished;
                }
                catch (Exception ex)
                {
                    //写错误日志
                    return false;
                }
                finally
                {
                    channel.Close();
                    MQ.conn.Close();
                }
            }

    总结:RabbitMQ本身已经很稳定了,而且性能也很好,所有不稳定的因素都在我们处理消息的过程,所以可以放心使用。

    Demo源码地址:https://github.com/Bingjian-Zhu/RabbitMQHelper

     

  • 相关阅读:
    [翻译]ASP.NET MVC 3 开发的20个秘诀(六)[20 Recipes for Programming MVC 3]:找回忘记的密码
    No.9425 [翻译]开发之前设计先行
    [翻译]ASP.NET MVC 3 开发的20个秘诀(五)[20 Recipes for Programming MVC 3]:发送欢迎邮件
    内联(inline)函数与虚函数(virtual)的讨论
    内联函数、模板、头文件杂记(转)
    gdb使用整理
    data structure and algorithm analysis上一点摘录
    lsof by example 例子
    linux core文件机制
    转一篇shared_ptr的小文
  • 原文地址:https://www.cnblogs.com/FireworksEasyCool/p/10562753.html
Copyright © 2011-2022 走看看