zoukankan      html  css  js  c++  java
  • ribbon源码之客户端

      客户端模块的核心功能是提供统一的用户请求操作接口

    接口定义

      客户端模块的核心是IClient接口,定义了客户端网络请求的方法。

    public interface IClient<S extends ClientRequest, T extends IResponse> {
        public T execute(S request, IClientConfig requestConfig) throws Exception; 
    }

      ClientRequest为客户端定义的请求体,存储了请求uri、loadbalancer的key,是否重试、配置。

    public class ClientRequest implements Cloneable {
        protected URI uri;
        protected Object loadBalancerKey = null;
        protected Boolean isRetriable = null;
        protected IClientConfig overrideConfig;
    }

      IResponse为客户端定义的响应内容的接口。

    public interface IResponse extends Closeable
    {
       public Object getPayload() throws ClientException;
       public boolean hasPayload();
       public boolean isSuccess();
       public URI getRequestedURI();
       public Map<String, ?> getHeaders();   
    }

      IClientConfigAware定义了需要使用IClientConfig初始化IClient的方法。

    public interface IClientConfigAware {
        public abstract void initWithNiwsConfig(IClientConfig clientConfig);
    }

      ribbon基于http请求,相关类和接口,HttpRequest为http请求;HttpResponse为http请求返回结果;Restclient是基于jesery的IClient实现。

    类图

    客户端工厂类

      客户端模块提供了一个客户端工厂类(ClientFactory)用于通过配置文件来创建IClient实例和负载均衡器(ILoadBalancer)实例。

    public static synchronized IClient getNamedClient(String name) {//根据配置获取iclient实例,默认使用DefaultClientConfigImpl配置类return getNamedClient(name, DefaultClientConfigImpl.class);
        }
    public static synchronized IClient getNamedClient(String name, Class<? extends IClientConfig> configClass) {
    ...
    return createNamedClient(name, configClass);
    ...
    }
    public static synchronized IClient createNamedClient(String name, Class<? extends IClientConfig> configClass) throws ClientException {
    IClientConfig config = getNamedConfig(name, configClass);//实例化配置类
    return registerClientFromProperties(name, config);//通过配置文件创建iclient
    }
    public static synchronized ILoadBalancer getNamedLoadBalancer(String name) {
    return getNamedLoadBalancer(name, DefaultClientConfigImpl.class);
    }
    public static synchronized ILoadBalancer getNamedLoadBalancer(String name, Class<? extends IClientConfig> configClass) {
    ...
    lb = registerNamedLoadBalancerFromProperties(name, configClass);
    ...
    }
    public static synchronized IClient<?, ?> registerClientFromProperties(String restClientName, IClientConfig clientConfig) throws ClientException { 
            IClient<?, ?> client = null;
            ILoadBalancer loadBalancer = null;
            ...
                String clientClassName = (String) clientConfig.getProperty(CommonClientConfigKey.ClientClassName);//通过配置文件获取client的实现类
                client = (IClient<?, ?>) instantiateInstanceWithClientConfig(clientClassName, clientConfig); //通过配置文件创建client实例
                boolean initializeNFLoadBalancer = Boolean.parseBoolean(clientConfig.getProperty(
                        CommonClientConfigKey.InitializeNFLoadBalancer, DefaultClientConfigImpl.DEFAULT_ENABLE_LOADBALANCER).toString());
                if (initializeNFLoadBalancer) {//如果需要初始化负载均衡器,则通过配置文件创建一个负载均衡器
                    loadBalancer  = registerNamedLoadBalancerFromclientConfig(restClientName, clientConfig);
                }
                if (client instanceof AbstractLoadBalancerAwareClient) {//如果client实现AbstractLoadBalancerAwareClient,则注入负载均衡器
                    ((AbstractLoadBalancerAwareClient) client).setLoadBalancer(loadBalancer);
                }
            ...return client;
        }
    public static ILoadBalancer registerNamedLoadBalancerFromclientConfig(String name, IClientConfig clientConfig) throws ClientException {
    ...
    ILoadBalancer lb = null;
    ...
    String loadBalancerClassName = (String) clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerClassName);//
    lb = (ILoadBalancer) ClientFactory.instantiateInstanceWithClientConfig(loadBalancerClassName, clientConfig);
    ...
    return lb;
    ...
    }
    //初始化指定的class类
    public static Object instantiateInstanceWithClientConfig(String className, IClientConfig clientConfig)
              throws InstantiationException, IllegalAccessException, ClassNotFoundException {
    Class clazz = Class.forName(className);
    if (IClientConfigAware.class.isAssignableFrom(clazz)) {//如果指定的iclient实现了IClientConfigAware,ClientFactory在创建时会使用IClientConfig进行初始化
    IClientConfigAware obj = (IClientConfigAware) clazz.newInstance();
    obj.initWithNiwsConfig(clientConfig);
    return obj;
    } else {
    try {
    if (clazz.getConstructor(IClientConfig.class) != null) {
    return clazz.getConstructor(IClientConfig.class).newInstance(clientConfig);
    }
    } catch (Throwable e) { // NOPMD
    }
    }
    return clazz.newInstance();
    }

      使用客户端工厂类(ClientFactory)涉及的配置:

    属性 实现 默认值
    clientname.ribbon.ClientClassName client使用的IClient实现类 com.netflix.niws.client.http.RestClient
    clientname.ribbon.InitializeNFLoadBalancer 是否初始化负载均衡器 true
    clientname.ribbon.NFLoadBalancerClassName 负载均衡器的实现类 com.netflix.loadbalancer.ZoneAwareLoadBalancer

    类图

    客户端实现类

      AbstractLoadBalancerAwareClient实现了通过负载均衡器进行请求调用。LoadBalancerCommand对负载均衡器操作进行了模版,对请求调用提供了回调函数。

    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
            LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
         ...return command.submit(
                    new ServerOperation<T>() {
                        @Override
                        public Observable<T> call(Server server) {
                            URI finalUri = reconstructURIWithServer(server, request.getUri());
                            S requestForServer = (S) request.replaceUri(finalUri);//设置最终的调用uritry {
                                return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                            } //调用execute方法执行请求调用catch (Exception e) {
                                return Observable.error(e);
                            }
                        }
                    })
                    .toBlocking()
                    .single();
            ...
            
        }

      LoadBalancerCommand调用了负载均衡器获得了一个server,然后调用回调函数执行请求。此外还提供了各个关键节点的监听器和异常重试机制。

    public Observable<T> submit(final ServerOperation<T> operation) {
            final ExecutionInfoContext context = new ExecutionInfoContext();
            if (listenerInvoker != null) {//执行回调接口
                try {
                    listenerInvoker.onExecutionStart();
                } catch (AbortExecutionException e) {
                    return Observable.error(e);
                }
            }
            final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
            final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
            Observable<T> o = 
                    (server == null ? selectServer() : Observable.just(server))//调用负载均衡器获得目标server
                    .concatMap(new Func1<Server, Observable<T>>() {
                        ...return operation.call(server).doOnEach(new Observer<T>() {
                                                ....
                                            });
                        ...
                            if (maxRetrysSame > 0) 
                                o = o.retry(retryPolicy(maxRetrysSame, true));
                            return o;
                        }
                    });
                
            if (maxRetrysNext > 0 && server == null) 
                o = o.retry(retryPolicy(maxRetrysNext, false));
            
            return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
                @Override
                public Observable<T> call(Throwable e) {
                    if (context.getAttemptCount() > 0) {
                        if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                            e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                                    "Number of retries on next server exceeded max " + maxRetrysNext
                                    + " retries, while making a call for: " + context.getServer(), e);
                        }
                        else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                            e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                                    "Number of retries exceeded max " + maxRetrysSame
                                    + " retries, while making a call for: " + context.getServer(), e);
                        }
                    }
                    if (listenerInvoker != null) {
                        listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
                    }
                    return Observable.error(e);
                }
            });
        }
    private Observable<Server> selectServer() {
            return Observable.create(new OnSubscribe<Server>() {
                @Override
                public void call(Subscriber<? super Server> next) {
                    try {
                        Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                        next.onNext(server);
                        next.onCompleted();
                    } catch (Exception e) {
                        next.onError(e);
                    }
                }
            });
        }

      子类 HttpRequest用于http请求,内部定义了http请求的各个内容,并且使用了builder模式。

    public class HttpRequest extends ClientRequest {protected CaseInsensitiveMultiMap httpHeaders = new CaseInsensitiveMultiMap();//head参数
        protected Multimap<String, String> queryParams = ArrayListMultimap.create();//query参数
        private Object entity;//消息体
        protected Verb verb;//http请求的method:post get head delete等。默认get
    }

       

    类图

  • 相关阅读:
    三分钟学会.NET微服务之Polly
    redis设置密码和redis主从复制
    程序员工作之外,如何再赚一份工资?
    吞吐量(TPS)、QPS、并发数、响应时间(RT)概念
    TPS和QPS的区别和理解
    制定一套适合自己团队的GITflow标准化工作流
    UvaLive 6600 Spanning trees in a secure lock pattern 矩阵行列式
    从零開始学Xamarin.Forms(一) 概述
    Unity 之 C# 利用回调函数实现C++匿名函数
    hdu 4324 Triangle LOVE(拓扑判环)
  • 原文地址:https://www.cnblogs.com/zhangwanhua/p/8328283.html
Copyright © 2011-2022 走看看