zoukankan      html  css  js  c++  java
  • RabbitMQ镜像队列初始化连接时的“优化”

    之前发过一篇帖子 应用.Net+Consul维护RabbitMq的高可用性,然后最近老大问我当初我这么搞是抽的什么想法- -然后顺便贴了两行C#代码:

             var factory = new ConnectionFactory()
                    {
                        UserName = "username",
                        Password = "password",
                        AutomaticRecoveryEnabled = true,
                        TopologyRecoveryEnabled = true
                    };
                    Connection = factory.CreateConnection(new string[3] { "ip1", "ip2", "ip3" });

    AutomaticRecoveryEnabled的作用就是断线重连,假如当前的连接断开了,连接不会释放

    TopologyRecoveryEnabled 重连后恢复当前的工作进程,比如channel、queue、发布的消息进度等。

    看上去其实却是比较方便的,为了狡辩我当场列出了我设计的方案的主要优点:

    1.rabbitmq网关设计的时候一方面是为了让客户端能够保证建立与master节点的连接,直连master性能较高。

    2.通过配置队列名+VirthHost可以获取队列信息,Consul可以起到配置中心的作用,可配置性高一些(其实此处是狡辩。。)

    3.RabbitMQ与Master队列建立的连接在发生故障时会第一时间查询网关获取新的Mater队列信息进行重连(当然代码内部也有重试机制,不会随便就更换连接的),相比于AutomaticRecoveryEnabled效率更高一些。

    4.客户端SDK内部实现定时更新队列连接,发现Master节点更换时重新建立连接

    看上去我设计的方案还是有点意义的(其实是真懒得改代码)

    不过此处有个问题就是既然创建连接时的参数可以提供多个IP的集合,假如RabbitMQ提供的客户端SDK内部实现的更好,那我狡辩什么不也完戏了吗。。。于是假装下载了下客户端sdk代码扫了扫,此处以C#代码为例,代码还是比较简单的。github地址

    直接定位ConnectionFactory类找CreateConnection()方法

           /// <summary>
            /// Create a connection using a list of hostnames using the configured port.
            /// By default each hostname is tried in a random order until a successful connection is
            /// found or the list is exhausted using the DefaultEndpointResolver.
            /// The selection behaviour can be overriden by configuring the EndpointResolverFactory.
            /// </summary>
            /// <param name="hostnames">
            /// List of hostnames to use for the initial
            /// connection and recovery.
            /// </param>
            /// <returns>Open connection</returns>
            /// <exception cref="BrokerUnreachableException">
            /// When no hostname was reachable.
            /// </exception>
            public IConnection CreateConnection(IList<string> hostnames)
            {
                return CreateConnection(hostnames, null);
            }
    
            /// <summary>
            /// Create a connection using a list of hostnames using the configured port.
            /// By default each endpoint is tried in a random order until a successful connection is
            /// found or the list is exhausted.
            /// The selection behaviour can be overriden by configuring the EndpointResolverFactory.
            /// </summary>
            /// <param name="hostnames">
            /// List of hostnames to use for the initial
            /// connection and recovery.
            /// </param>
            /// <param name="clientProvidedName">
            /// Application-specific connection name, will be displayed in the management UI
            /// if RabbitMQ server supports it. This value doesn't have to be unique and cannot
            /// be used as a connection identifier, e.g. in HTTP API requests.
            /// This value is supposed to be human-readable.
            /// </param>
            /// <returns>Open connection</returns>
            /// <exception cref="BrokerUnreachableException">
            /// When no hostname was reachable.
            /// </exception>
            public IConnection CreateConnection(IList<string> hostnames, String clientProvidedName)
            {
                var endpoints = hostnames.Select(h => new AmqpTcpEndpoint(h, this.Port, this.Ssl));
                return CreateConnection(new DefaultEndpointResolver(endpoints), clientProvidedName);
            }
    
            /// <summary>
            /// Create a connection using a list of endpoints. By default each endpoint will be tried
            /// in a random order until a successful connection is found or the list is exhausted.
            /// The selection behaviour can be overriden by configuring the EndpointResolverFactory.
            /// </summary>
            /// <param name="endpoints">
            /// List of endpoints to use for the initial
            /// connection and recovery.
            /// </param>
            /// <returns>Open connection</returns>
            /// <exception cref="BrokerUnreachableException">
            /// When no hostname was reachable.
            /// </exception>
            public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints)
            {
                return CreateConnection(new DefaultEndpointResolver(endpoints), null);
            }
    
            /// <summary>
            /// Create a connection using an IEndpointResolver.
            /// </summary>
            /// <param name="endpointResolver">
            /// The endpointResolver that returns the endpoints to use for the connection attempt.
            /// </param>
            /// <param name="clientProvidedName">
            /// Application-specific connection name, will be displayed in the management UI
            /// if RabbitMQ server supports it. This value doesn't have to be unique and cannot
            /// be used as a connection identifier, e.g. in HTTP API requests.
            /// This value is supposed to be human-readable.
            /// </param>
            /// <returns>Open connection</returns>
            /// <exception cref="BrokerUnreachableException">
            /// When no hostname was reachable.
            /// </exception>
            public IConnection CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
            {
                IConnection conn;
                try
                {
                    if (AutomaticRecoveryEnabled)
                    {
                        var autorecoveringConnection = new AutorecoveringConnection(this, clientProvidedName);
                        autorecoveringConnection.Init(endpointResolver);
                        conn = autorecoveringConnection;
                    }
                    else
                    {
                        IProtocol protocol = Protocols.DefaultProtocol;
                        conn = protocol.CreateConnection(this, false, endpointResolver.SelectOne(this.CreateFrameHandler), clientProvidedName);
                    }
                }
                catch (Exception e)
                {
                    throw new BrokerUnreachableException(e);
                }
    
                return conn;
            }
    View Code

    代码比较直观,第二个方法的时候把传入的字符串集合转换成了AmqpTcpEndpoint集合,AmqpTcpEndpoint里包含队列绑定信息,包含默认ip,端口,SSL配置信息,RabbitMQ的Amqp协议信息,初始化Socket连接的协议类型(一个AddressFamily枚举),当然都是可配置的(除了ip貌似都喜欢使用默认的)。

    然后就倒主要部分了。

                   if (AutomaticRecoveryEnabled)
                    {
                        var autorecoveringConnection = new AutorecoveringConnection(this, clientProvidedName);
                        autorecoveringConnection.Init(endpointResolver);
                        conn = autorecoveringConnection;
                    }
                    else
                    {
                        IProtocol protocol = Protocols.DefaultProtocol;
                        conn = protocol.CreateConnection(this, false, endpointResolver.SelectOne(this.CreateFrameHandler), clientProvidedName);
                    }             

    由于主要想看看传入多个连接是不是可以智能的选择master连接,所以此处直接看else里的代码就行了。。。(其实一样。。)

    protocol.CreateConnection方法调用时传入的第三个参数endpointResolver.SelectOne(this.CreateFrameHandler)作用其实就是选择连接然后扔到CreateFrameHandler委托里作为参数进行Socket初始化的。看一下SelectOne这个扩展方法即可。。看了两行终于可以继续狡辩了。。

             public static T SelectOne<T>(this IEndpointResolver resolver, Func<AmqpTcpEndpoint, T> selector)
            {
                var t = default(T);
                Exception exception = null;
                foreach(var ep in resolver.All())
                {
                    try
                    {
                        t = selector(ep);
                        if(t.Equals(default(T)) == false)
                        {
                            return t;
                        }
                    }
                    catch (Exception e)
                    {
                        exception = e;
                    }
                }
    
                if(Object.Equals(t, default(T)) && exception != null)
                {
                    throw exception;
                }
    
                return t;
            }

    这个foreach还是“比较好的”,能够建立一个Socket连接就愉快的返回了!,不过这个resolver.All()里是不是还有玄机呢!看了一眼。。终于放心了。。

         public class DefaultEndpointResolver : IEndpointResolver
        {
            private List<AmqpTcpEndpoint> endpoints;
            private Random rnd = new Random();
    
            public DefaultEndpointResolver (IEnumerable<AmqpTcpEndpoint> tcpEndpoints)
            {
               this.endpoints = tcpEndpoints.ToList();
            }
    
            public IEnumerable<AmqpTcpEndpoint> All()
            {
                return endpoints.OrderBy(item => rnd.Next());
            }
    View Code

    就是随机排序一下。。不过这么看自己实现下IEndpointResolver接口改个选择master队列的策略也是不错的。

  • 相关阅读:
    poj 2763 Housewife Wind
    hdu 3966 Aragorn's Story
    poj 1655 Balancing Act 求树的重心
    有上下界的网络流问题
    URAL 1277 Cops and Thieves 最小割 无向图点带权点连通度
    ZOJ 2532 Internship 网络流求关键边
    ZOJ 2760 How Many Shortest Path 最大流+floyd求最短路
    SGU 438 The Glorious Karlutka River =) 拆点+动态流+最大流
    怎么样仿写已知网址的网页?
    5-10 公路村村通 (30分)
  • 原文地址:https://www.cnblogs.com/ylsforever/p/6644758.html
Copyright © 2011-2022 走看看