zoukankan      html  css  js  c++  java
  • 解决消息总线里监听线程过多的问题

    在设计基于队列消息总线时,最好避免订阅者直接监听队列,而是再实现一套发布订阅模式,订阅者订阅的不是消息队列的信息,而是系统实现的总线。每次订阅时,判断此订阅消息(队列)是否存在,不存在则往【监听容器】里放,并且初始化一个默认的监听者监听此队列,这个监听者如收到消息,则对订阅者发送消息,这样可以避免直接监听队列造成的线程过多
     
    思路示例:
     public class Bus<TMessage> where TMessage : class
        {
            Dictionary<Type , List<object>> _handlers = new Dictionary<Type , List<object>>();
    
            public void Publish( TMessage msg )
            {
                string queueName = msg.GetType().FullName;
                XXXMQ mq = new XXXMQ( queueName );
                mq.Text = Newtonsoft.Json.JsonConvert.SerializeObject( mq );
                mq.Send();
            }
    
            public void Subscribe( TMessage msg , IHandler handler)
            {
                Type msgType = msg.GetType();
                if( _handlers.ContainsKey( msgType ) )
                {
                    List<object> handlers = _handlers[ msgType ];
                    handlers.Add( handler );
                }
                else
                {
                    List<object> handlers = new List<object>();
                    handlers.Add( handler );
                    _handlers.Add( msgType , handlers );
                    ListenMQ( msg );
                }
            }
    
            private void ListenMQ( TMessage msg )
            {
                Task.Factory.StartNew( () =>
                {
                    while( true )
                    {
                        string queueName = msg.GetType().FullName;
                        XXXMQ mq = new XXXMQ( queueName );
                        string text = mq.Receive(); //监听队列
    
                        TMessage message = Newtonsoft.Json.JsonConvert.DeserializeObject<TMessage>( text );
                        List<object> handlers = _handlers[ message.GetType() ];
                        foreach( var handler in handlers )
                        {
                            handler( message );
                        }
                    }
                } );
            }
    
        }

    不过这样也有一个问题,因为每一个订阅者并不是直接监听队列,如果发生问题没有保障,容错性降低了,所以需要在总线里实现一套容错机制(失败重试之类)

    最后还是推荐订阅者直接监听队列

  • 相关阅读:
    第五周学习进度
    第四周学习进度
    四则运算三
    第三周学习进度
    软件工程个人作业—四则运算2
    软件工程个人作业01
    软件工程概论—用户登录界面
    构建之法读书笔记06
    构建之法读书笔记05
    构建之法读书笔记04
  • 原文地址:https://www.cnblogs.com/Scissors/p/5891676.html
Copyright © 2011-2022 走看看