zoukankan      html  css  js  c++  java
  • 解决消息总线里监听线程过多的问题

    在设计基于队列消息总线时,最好避免订阅者直接监听队列,而是再实现一套发布订阅模式,订阅者订阅的不是消息队列的信息,而是系统实现的总线。每次订阅时,判断此订阅消息(队列)是否存在,不存在则往【监听容器】里放,并且初始化一个默认的监听者监听此队列,这个监听者如收到消息,则对订阅者发送消息,这样可以避免直接监听队列造成的线程过多
     
    思路示例:
     public class Bus<TMessage> where TMessage : class
        {
            Dictionary<Type , List<object>> _handlers = new Dictionary<Type , List<object>>();
    
            public void Publish( TMessage msg )
            {
                string queueName = msg.GetType().FullName;
                XXXMQ mq = new XXXMQ( queueName );
                mq.Text = Newtonsoft.Json.JsonConvert.SerializeObject( mq );
                mq.Send();
            }
    
            public void Subscribe( TMessage msg , IHandler handler)
            {
                Type msgType = msg.GetType();
                if( _handlers.ContainsKey( msgType ) )
                {
                    List<object> handlers = _handlers[ msgType ];
                    handlers.Add( handler );
                }
                else
                {
                    List<object> handlers = new List<object>();
                    handlers.Add( handler );
                    _handlers.Add( msgType , handlers );
                    ListenMQ( msg );
                }
            }
    
            private void ListenMQ( TMessage msg )
            {
                Task.Factory.StartNew( () =>
                {
                    while( true )
                    {
                        string queueName = msg.GetType().FullName;
                        XXXMQ mq = new XXXMQ( queueName );
                        string text = mq.Receive(); //监听队列
    
                        TMessage message = Newtonsoft.Json.JsonConvert.DeserializeObject<TMessage>( text );
                        List<object> handlers = _handlers[ message.GetType() ];
                        foreach( var handler in handlers )
                        {
                            handler( message );
                        }
                    }
                } );
            }
    
        }

    不过这样也有一个问题,因为每一个订阅者并不是直接监听队列,如果发生问题没有保障,容错性降低了,所以需要在总线里实现一套容错机制(失败重试之类)

    最后还是推荐订阅者直接监听队列

  • 相关阅读:
    Redis分布式锁实现
    mysql索引命中规则
    spring注解原理
    img 标签访问图片返回403forbidden
    根据注解修改属性的值
    通过反射改变对象的属性
    利用反射获取类或者方法或者字段上的注解的值
    什么时候出现死锁,如何解决?mysql 引擎? 多个like or 查询sql如何优化?什么是常量池?for条件执行顺序
    jvm 基础
    为什么要用jvm .
  • 原文地址:https://www.cnblogs.com/Scissors/p/5891676.html
Copyright © 2011-2022 走看看