zoukankan      html  css  js  c++  java
  • 第三课 Dubbo设计中的设计模式

    责任链模式                                                                                                             

    责任链模式在Dubbo中发挥的作用举足轻重,就像是Dubbo框架的骨架。Dubbo的调用链组织是用责任链模式串连起来的。

    责任链中的每个节点实现Filter接口,然后由ProtocolFilterWrapper,将所有Filter串连起来。

    Dubbo的许多功能都是通过Filter扩展实现的,比如监控、日志、缓存、安全、telnet以及RPC本身都是。

    如果把Dubbo比作一列火车,责任链就像是火车的各车厢,每个车厢的功能不同。

    如果需要加入新的功能,增加车厢就可以了,非常容易扩展。

    最经典的实现链式Filter代码。采用匿名内部类来实现,一定要DEBUG进去看看。

     private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    
           Invoker<T> last = invoker;
    
           List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    
           if (filters.size() > 0) {
    
               for (int i = filters.size() - 1; i >= 0; i --) {
    
                   final Filter filter = filters.get(i);
    
                   final Invoker<T> next = last;
    
                   last = new Invoker<T>() {
    
     
    
                       public Class<T> getInterface() {
    
                           return invoker.getInterface();
    
                       }
    
     
    
                       public URL getUrl() {
    
                           return invoker.getUrl();
    
                       }
    
     
    
                       public boolean isAvailable() {
    
                           return invoker.isAvailable();
    
                       }
    
     
    
                       public Result invoke(Invocation invocation) throws RpcException {
    
                           return filter.invoke(next, invocation);
    
                       }
    
     
    
                       public void destroy() {
    
                           invoker.destroy();
    
                       }
    
     
    
                       @Override
    
                       public String toString() {
    
                           return invoker.toString();
    
                       }
    
                   };
    
               }
    
           }
    
           return last;
    
       }
    

     

    至少有2个典型案例:

    • dubbo filter链式调用

    • dubbo handler链式调用

    Dubbo的Filter类似于 serlvet filter.可以搞一些非业务的工作,如限流,超时,访问日志记录,trace等。 dubbo服务,进行refer或export时,会build filter. 采用了匿名机制。

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    
           if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
    
               return protocol.export(invoker);
    
           }
    
           return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    
       }
    
     
    
       public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    
           if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
    
               return protocol.refer(type, url);
    
           }
    
           return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
    
       }
    
     
    
       public void destroy() {
    
           protocol.destroy();
    
       }
    
     
    
       private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    
           Invoker<T> last = invoker;
    
           List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    
           if (filters.size() > 0) {
    
               for (int i = filters.size() - 1; i >= 0; i --) {
    
                   final Filter filter = filters.get(i);
    
                   final Invoker<T> next = last;
    
                   last = new Invoker<T>() {
    
     
    
                       public Class<T> getInterface() {
    
                           return invoker.getInterface();
    
                       }
    
     
    
                       public URL getUrl() {
    
                           return invoker.getUrl();
    
                       }
    
     
    
                       public boolean isAvailable() {
    
                           return invoker.isAvailable();
    
                       }
    
     
    
                       public Result invoke(Invocation invocation) throws RpcException {
    
                           return filter.invoke(next, invocation);
    
                       }
    
     
    
                       public void destroy() {
    
                           invoker.destroy();
    
                       }
    
     
    
                       @Override
    
                       public String toString() {
    
                           return invoker.toString();
    
                       }
    
                   };
    
               }
    
           }
    
           return last;
    
       }
    

      

    dubbo handler采用的也是链式模式。 链式模型是通信系统中的经典模式,也叫做pipeline模式。 应用数据通过协议层,传输层,序列化后,和管道非常类似,在每一层,都会进行相应的业务处理,然后传到下一层

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    
           return new MultiMessageHandler(new HeartbeatHandler(((Dispatcher)ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension()).dispatch(handler, url)));
    
       }

    最里层那个hanlder也是一层一层套的handler.分别是DecodeHandler,HeaderExchangeHandler,DubboProtocol(handler)

    这几个handler关系比较复杂

    观察者模式                                                                                                              

    Dubbo中使用观察者模式最典型的例子是RegistryService

    消费者在初始化的时候回调用subscribe方法,注册一个观察者,如果观察者引用的服务地址列表发生改变,就会通过NotifyListener通知消费者。

    此外,Dubbo的InvokerListenerExporterListener 也实现了观察者模式,只要实现该接口,并注册,

    就可以接收到consumer端调用refer和provider端调用export的通知。Dubbo的注册/订阅模型和观察者模式就是天生一对。

    节点export或refer的时候,都会订阅感兴趣的节点。

    public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException {
    
           final RegistryProtocol.ExporterChangeableWrapper<T> exporter = this.doLocalExport(originInvoker);
    
           final Registry registry = this.getRegistry(originInvoker);
    
           final URL registedProviderUrl = this.getRegistedProviderUrl(originInvoker);
    
           registry.register(registedProviderUrl);
    
           final URL overrideSubscribeUrl = this.getSubscribedOverrideUrl(registedProviderUrl);
    
           final RegistryProtocol.OverrideListener overrideSubscribeListener = new RegistryProtocol.OverrideListener(overrideSubscribeUrl);
    
           this.overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    
           registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    
    }
    
     
    
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    
       RegistryDirectory<T> directory = new RegistryDirectory(type, url);
    
       directory.setRegistry(registry);
    
       directory.setProtocol(this.protocol);
    
       URL subscribeUrl = new URL("consumer", NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
    
       if (!"*".equals(url.getServiceInterface()) && url.getParameter("register", true)) {
    
           registry.register(subscribeUrl.addParameters(new String[]{"category", "consumers", "check", String.valueOf(false)}));
    
       }
    
     
    
       directory.subscribe(subscribeUrl.addParameter("category", "providers,configurators,routers"));
    
       return cluster.join(directory);
    
    }
    
     
    
    public void subscribe(URL url) {
    
       this.setConsumerUrl(url);
    
       this.registry.subscribe(url, this);
    
    }
    

      

    上面,订阅节点信息的时候,把自己的this引用传进去了。这样,当节点有变化的时候,会通过this上下文,修改Invoker列表。

    ChildListener zkListener = (ChildListener)listeners.get(listener);
    
                       if (zkListener == null) {
    
                           listeners.putIfAbsent(listener, new ChildListener() {
    
                               public void childChanged(String parentPath, List<String> currentChilds) {
    
                                   ZookeeperRegistry.this.notify(url, listener, ZookeeperRegistry.this.toUrlsWithEmpty(url, parentPath, currentChilds));
    
                               }
    
                           });
    
                           zkListener = (ChildListener)listeners.get(listener);
    
     }
    

      

    一路进来,刷新invoker.注意,invoker会产生竟态条件,所以需要加锁。

     public synchronized void notify(List<URL> urls) {
    
           List<URL> invokerUrls = new ArrayList();
    
           List<URL> routerUrls = new ArrayList();
    
           List<URL> configuratorUrls = new ArrayList();
    
           Iterator i$ = urls.iterator();
    
     
    
           while(true) {
    
               while(true) {
    
                   while(i$.hasNext()) {
    
                       URL url = (URL)i$.next();
    
                       String protocol = url.getProtocol();
    
                       String category = url.getParameter("category", "providers");
    
     
    
                   this.refreshInvoker(invokerUrls);
    
                   return;
    
                   }
    
               }
    
       }
    

       

    修饰器模式                                                                                                               

    Dubbo中还大量用到了修饰器模式。比如ProtocolFilterWrapper类是对Protocol类的修饰。在export和refer方法中,配合责任链模式,

    把Filter组装成责任链,实现对Protocol功能的修饰。其他还有ProtocolListenerWrapper、 ListenerInvokerWrapperInvokerWrapper等。

    个人感觉,修饰器模式是一把双刃剑,一方面用它可以方便地扩展类的功能,而且对用户无感,

    但另一方面,过多地使用修饰器模式不利于理解,因为一个类可能经过层层修饰,最终的行为已经和原始行为偏离较大。

    工厂方法模式                                                                                                            

    CacheFactory的实现采用的是工厂方法模式。CacheFactory接口定义getCache方法,

    然后定义一个AbstractCacheFactory抽象类实现CacheFactory

    并将实际创建cache的createCache方法分离出来,并设置为抽象方法。这样具体cache的创建工作就留给具体的子类去完成。

     

    插件机制

    Dubbo本身的功能基本都够用了,但是Dubbo没有固步自封,而是平等的对待第三方,用户可以定制自己的插件,对Dubbo功能进行扩展。 Dubbo通过SPI机制,实现插件机制。 机制如下:

    • DUBBO框架预留了接口,具体的实现,由插件实现

    • SPI注解,通过SPI注解,以及约定的配置文件,完成实现接口的映射关系

    • 插件配置放在目录”META-INF/dubbo/internal“下面

    • 配置文件的格式是”KEY=VALUE“格式

    • KEY是SPI注解上面的值,VALUE是对应的插件实现类

    loadExtensionClasses会从约定好的目录下载加载类。

       private Map<String, Class<?>> loadExtensionClasses() {
    
           SPI defaultAnnotation = (SPI)this.type.getAnnotation(SPI.class);
    
           String value = defaultAnnotation.value();
    
           Map<String, Class<?>> extensionClasses = new HashMap();
    
           this.loadFile(extensionClasses, "META-INF/dubbo/internal/");
    
           return extensionClasses;
    
       }

    一般情况下,使用默认的即可,如果需要使用自定义的插件,可以通过URL传递。 例如,负载均均衡测试,通过URL指定,如果未指定,则使用默认的随机负载均衡策略。

    loadbalance = (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(((Invoker)invokers.get(0)).getUrl().getMethodParameter(invocation.getMethodName(), "loadbalance", "random"));

    上述加载类的过程非常复杂,这次简单过一下

    抽象工厂模式                                                                                                             

    ProxyFactory及其子类是Dubbo中使用抽象工厂模式的典型例子。

    ProxyFactory提供两个方法,分别用来生产ProxyInvoker

    (这两个方法签名看起来有些矛盾,因为getProxy方法需要传入一个Invoker对象,而getInvoker方法需要传入一个Proxy对象,看起来会形成循环依赖,但其实两个方式使用的场景不一样)。

    AbstractProxyFactory实现了ProxyFactory接口,作为具体实现类的抽象父类。

    然后定义了JdkProxyFactoryJavassistProxyFactory两个具体类,分别用来生产基于jdk代理机制和基于javassist代理机制的ProxyInvoker

    适配器模式                                                                                                                  

    为了让用户根据自己的需求选择日志组件,Dubbo自定义了自己的Logger接口,并为常见的日志组件(包括jcl, jdk, log4j, slf4j)提供相应的适配器。

    并且利用简单工厂模式提供一个LoggerFactory,客户可以创建抽象的Dubbo自定义Logger,而无需关心实际使用的日志组件类型。

    在LoggerFactory初始化时,客户通过设置系统变量的方式选择自己所用的日志组件,这样提供了很大的灵活性。

    至少有3个经典案例

    • Transport 完成Server 和Client接口功能

    • CoderAdapter完成encode和decode功能 NettyCodecAdapter

    • @Adaptive 

    @SPI("netty")
    
    public interface Transporter {
    
       @Adaptive({"server", "transporter"})
    
       Server bind(URL var1, ChannelHandler var2) throws RemotingException;
    
     
    
       @Adaptive({"client", "transporter"})
    
       Client connect(URL var1, ChannelHandler var2) throws RemotingException;
    
    }

    下面就是给类动态的增加功能。

    private static Wrapper makeWrapper(Class<?> c)
    
       {
    
           if( c.isPrimitive() )
    
               throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);
    
     
    
           String name = c.getName();
    
           ClassLoader cl = ClassHelper.getClassLoader(c);
    
     
    
           StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
    
           StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
    
           StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");
    
     
    
           c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
    
           c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
    
           c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
    
     
    
           Map<String, Class<?>> pts = new HashMap<String, Class<?>>(); // <property name, property types>
    
           Map<String, Method> ms = new LinkedHashMap<String, Method>(); // <method desc, Method instance>
    
           List<String> mns = new ArrayList<String>(); // method names.
    
           List<String> dmns = new ArrayList<String>(); // declaring method names.
    
     
    
           // get all public field.
    
           for( Field f : c.getFields() )
    
           {
    
               String fn = f.getName();
    
               Class<?> ft = f.getType();
    
               if( Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers()) )
    
                   continue;
    
     
    
               c1.append(" if( $2.equals("").append(fn).append("") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");
    
               c2.append(" if( $2.equals("").append(fn).append("") ){ return ($w)w.").append(fn).append("; }");
    
               pts.put(fn, ft);
    
           }
    
     
    
           Method[] methods = c.getMethods();
    
           // get all public method.
    
           boolean hasMethod = hasMethods(methods);
    
           if( hasMethod ){
    
               c3.append(" try{");
    
           }
    
           for( Method m : methods )
    
           {
    
               if( m.getDeclaringClass() == Object.class ) //ignore Object's method.
    
                   continue;
    
     
    
               String mn = m.getName();
    
               c3.append(" if( "").append(mn).append("".equals( $2 ) ");
    
               int len = m.getParameterTypes().length;
    
               c3.append(" && ").append(" $3.length == ").append(len);
    
     
    
               boolean override = false;
    
               for( Method m2 : methods ) {
    
                   if (m != m2 && m.getName().equals(m2.getName())) {
    
                       override = true;
    
                       break;
    
                   }
    
               }
    
               if (override) {
    
                   if (len > 0) {
    
                       for (int l = 0; l < len; l ++) {
    
                           c3.append(" && ").append(" $3[").append(l).append("].getName().equals("")
    
                               .append(m.getParameterTypes()[l].getName()).append("")");
    
                       }
    
                   }
    
               }
    
     
    
               c3.append(" ) { ");
    
     
    
               if( m.getReturnType() == Void.TYPE )
    
                   c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");
    
               else
    
                   c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");");
    
     
    
               c3.append(" }");
    
     
    
               mns.add(mn);
    
               if( m.getDeclaringClass() == c )
    
                   dmns.add(mn);
    
               ms.put(ReflectUtils.getDesc(m), m);
    
           }
    
           if( hasMethod ){
    
               c3.append(" } catch(Throwable e) { " );
    
               c3.append("     throw new java.lang.reflect.InvocationTargetException(e); " );
    
               c3.append(" }");
    
           }
    
     
    
           c3.append(" throw new " + NoSuchMethodException.class.getName() + "("Not found method \""+$2+"\" in class " + c.getName() + "."); }");
    
     
    
           // deal with get/set method.
    
           Matcher matcher;
    
           for( Map.Entry<String,Method> entry : ms.entrySet() )
    
           {
    
               String md = entry.getKey();
    
               Method method = (Method)entry.getValue();
    
               if( ( matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md) ).matches() )
    
               {
    
                   String pn = propertyName(matcher.group(1));
    
                   c2.append(" if( $2.equals("").append(pn).append("") ){ return ($w)w.").append(method.getName()).append("(); }");
    
                   pts.put(pn, method.getReturnType());
    
               }
    
               else if( ( matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md) ).matches() )
    
               {
    
                   String pn = propertyName(matcher.group(1));
    
                   c2.append(" if( $2.equals("").append(pn).append("") ){ return ($w)w.").append(method.getName()).append("(); }");
    
                   pts.put(pn, method.getReturnType());
    
               }
    
               else if( ( matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md) ).matches() )
    
               {
    
                   Class<?> pt = method.getParameterTypes()[0];
    
                   String pn = propertyName(matcher.group(1));
    
                   c1.append(" if( $2.equals("").append(pn).append("") ){ w.").append(method.getName()).append("(").append(arg(pt,"$3")).append("); return; }");
    
                   pts.put(pn, pt);
    
               }
    
           }
    
           c1.append(" throw new " + NoSuchPropertyException.class.getName() + "("Not found property \""+$2+"\" filed or setter method in class " + c.getName() + "."); }");
    
           c2.append(" throw new " + NoSuchPropertyException.class.getName() + "("Not found property \""+$2+"\" filed or setter method in class " + c.getName() + "."); }");
    
     
    
           // make class
    
           long id = WRAPPER_CLASS_COUNTER.getAndIncrement();
    
           ClassGenerator cc = ClassGenerator.newInstance(cl);
    
           cc.setClassName( ( Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw" ) + id );
    
           cc.setSuperClass(Wrapper.class);
    
     
    
           cc.addDefaultConstructor();
    
           cc.addField("public static String[] pns;"); // property name array.
    
           cc.addField("public static " + Map.class.getName() + " pts;"); // property type map.
    
           cc.addField("public static String[] mns;"); // all method name array.
    
           cc.addField("public static String[] dmns;"); // declared method name array.
    
           for(int i=0,len=ms.size();i<len;i++)
    
               cc.addField("public static Class[] mts" + i + ";");
    
     
    
           cc.addMethod("public String[] getPropertyNames(){ return pns; }");
    
           cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");
    
           cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");
    
           cc.addMethod("public String[] getMethodNames(){ return mns; }");
    
           cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");
    
           cc.addMethod(c1.toString());
    
           cc.addMethod(c2.toString());
    
           cc.addMethod(c3.toString());
    
     
    
           try
    
           {
    
               Class<?> wc = cc.toClass();
    
               // setup static field.
    
               wc.getField("pts").set(null, pts);
    
               wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
    
               wc.getField("mns").set(null, mns.toArray(new String[0]));
    
               wc.getField("dmns").set(null, dmns.toArray(new String[0]));
    
               int ix = 0;
    
               for( Method m : ms.values() )
    
                   wc.getField("mts" + ix++).set(null, m.getParameterTypes());
    
               return (Wrapper)wc.newInstance();
    
           }
    
           catch(RuntimeException e)
    
           {
    
               throw e;
    
           }
    
           catch(Throwable e)
    
           {
    
               throw new RuntimeException(e.getMessage(), e);
    
           }
    
           finally
    
           {
    
               cc.release();
    
               ms.clear();
    
               mns.clear();
    
               dmns.clear();
    
           }
    
       }
    

      

    代理模式                                                                                                                      

    Dubbo consumer使用Proxy类创建远程服务的本地代理,本地代理实现和远程服务一样的接口,并且屏蔽了网络通信的细节,使得用户在使用本地代理的时候,感觉和使用本地服务一样。

    public class JavassistProxyFactory extends AbstractProxyFactory {
    
     
    
       @SuppressWarnings("unchecked")
    
       public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    
           return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    
       }
    
     
    
       public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    
           // TODO Wrapper类不能正确处理带$的类名
    
           final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    
           return new AbstractProxyInvoker<T>(proxy, type, url) {
    
               @Override
    
               protected Object doInvoke(T proxy, String methodName,
    
                                         Class<?>[] parameterTypes,
    
                                         Object[] arguments) throws Throwable {
    
                   return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
    
               }
    
           };
    
       }
    
     
    
    }
  • 相关阅读:
    CentOS 6.5下快速搭建ftp服务器
    Ubuntu增加swap交换空间的步骤
    mysql官方下载安装教程(centos)
    阿里云上遇到: virtual memory exhausted: Cannot allocate memory
    解决nginx: [error] open() "/usr/local/nginx/logs/nginx.pid" failed错误
    centos安装nodejs和配置npm
    JavaScript(二)-精简
    JavaScript(一)
    ease,seae-in,ease-in-out,ease-out区别
    安装 sass 文档
  • 原文地址:https://www.cnblogs.com/JiangWJ/p/10946819.html
Copyright © 2011-2022 走看看