zoukankan      html  css  js  c++  java
  • dubbo/dubbox 增加原生thrift及avro支持

    (facebook) thrift / (hadoop) avro / (google) probuf(grpc)是近几年来比较抢眼的高效序列化/rpc框架,dubbo框架虽然有thrift的支持,但是依赖的版本较早,只支持0.8.0,而且还对协议做一些扩展,并非原生的thrift协议。

    github上虽然也有朋友对dubbo做了扩展支持原生thrift,但是代码实在太多了,只需要一个类即可:

    Thrift2Protocal.java:

    package com.alibaba.dubbo.rpc.protocol.thrift2;
    
    import com.alibaba.dubbo.common.URL;
    import com.alibaba.dubbo.common.logger.Logger;
    import com.alibaba.dubbo.common.logger.LoggerFactory;
    import com.alibaba.dubbo.rpc.RpcException;
    import com.alibaba.dubbo.rpc.protocol.AbstractProxyProtocol;
    import org.apache.thrift.TProcessor;
    import org.apache.thrift.protocol.TCompactProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.apache.thrift.server.TNonblockingServer;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TNonblockingServerSocket;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    
    import java.lang.reflect.Constructor;
    
    /**
     * 为dubbo-rpc添加"原生thrift"支持
     * by 杨俊明(http://yjmyzz.cnblogs.com/)
     */
    public class Thrift2Protocol extends AbstractProxyProtocol {
        public static final int DEFAULT_PORT = 33208;
        private static final Logger logger = LoggerFactory.getLogger(Thrift2Protocol.class);
    
        public int getDefaultPort() {
            return DEFAULT_PORT;
        }
    
        @Override
        protected <T> Runnable doExport(T impl, Class<T> type, URL url)
                throws RpcException {
    
            logger.info("impl => " + impl.getClass());
            logger.info("type => " + type.getName());
            logger.info("url => " + url);
    
            TProcessor tprocessor;
            TNonblockingServer.Args tArgs = null;
            String iFace = "$Iface";
            String processor = "$Processor";
            String typeName = type.getName();
            TNonblockingServerSocket transport;
            if (typeName.endsWith(iFace)) {
                String processorClsName = typeName.substring(0, typeName.indexOf(iFace)) + processor;
                try {
                    Class<?> clazz = Class.forName(processorClsName);
                    Constructor constructor = clazz.getConstructor(type);
                    try {
                        tprocessor = (TProcessor) constructor.newInstance(impl);
                        transport = new TNonblockingServerSocket(url.getPort());
                        tArgs = new TNonblockingServer.Args(transport);
                        tArgs.processor(tprocessor);
                        tArgs.transportFactory(new TFramedTransport.Factory());
                        tArgs.protocolFactory(new TCompactProtocol.Factory());
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                        throw new RpcException("Fail to create thrift server(" + url + ") : " + e.getMessage(), e);
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    throw new RpcException("Fail to create thrift server(" + url + ") : " + e.getMessage(), e);
                }
            }
    
            if (tArgs == null) {
                logger.error("Fail to create thrift server(" + url + ") due to null args");
                throw new RpcException("Fail to create thrift server(" + url + ") due to null args");
            }
            final TServer thriftServer = new TNonblockingServer(tArgs);
    
            new Thread(new Runnable() {
                public void run() {
                    logger.info("Start Thrift Server");
                    thriftServer.serve();
                    logger.info("Thrift server started.");
                }
            }).start();
    
            return new Runnable() {
                public void run() {
                    try {
                        logger.info("Close Thrift Server");
                        thriftServer.stop();
                    } catch (Throwable e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            };
        }
    
        @Override
        protected <T> T doRefer(Class<T> type, URL url) throws RpcException {
    
            logger.info("type => " + type.getName());
            logger.info("url => " + url);
    
            try {
                TSocket tSocket;
                TTransport transport;
                TProtocol protocol;
                T thriftClient = null;
                String iFace = "$Iface";
                String client = "$Client";
    
                String typeName = type.getName();
                if (typeName.endsWith(iFace)) {
                    String clientClsName = typeName.substring(0, typeName.indexOf(iFace)) + client;
                    Class<?> clazz = Class.forName(clientClsName);
                    Constructor constructor = clazz.getConstructor(TProtocol.class);
                    try {
                        tSocket = new TSocket(url.getHost(), url.getPort());
                        transport = new TFramedTransport(tSocket);
                        protocol = new TCompactProtocol(transport);
                        thriftClient = (T) constructor.newInstance(protocol);
                        transport.open();
                        logger.info("thrift client opened for service(" + url + ")");
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                        throw new RpcException("Fail to create remoting client:" + e.getMessage(), e);
                    }
                }
                return thriftClient;
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
            }
        }
    
    }
    

    重写父类AbstractProxyProtocol的二个抽象方法doExport及doRefer即可,doExport用于对外暴露RPC服务,在这个方法里启动thrift server,dubbo service provider在启动时会调用该方法。而doRefer用于dubbo service consumer发现服务后,获取对应的rpc-client。 

    参考这个思路,avro也很容易集成进来:

    AvroProtocol.java

    package com.alibaba.dubbo.rpc.protocol.avro;
    
    import com.alibaba.dubbo.common.URL;
    import com.alibaba.dubbo.common.logger.Logger;
    import com.alibaba.dubbo.common.logger.LoggerFactory;
    import com.alibaba.dubbo.rpc.RpcException;
    import com.alibaba.dubbo.rpc.protocol.AbstractProxyProtocol;
    import org.apache.avro.ipc.NettyServer;
    import org.apache.avro.ipc.NettyTransceiver;
    import org.apache.avro.ipc.Server;
    import org.apache.avro.ipc.reflect.ReflectRequestor;
    import org.apache.avro.ipc.reflect.ReflectResponder;
    
    import java.net.InetSocketAddress;
    
    /**
     * 为dubbo-rpc添加avro支持
     * by 杨俊明(http://yjmyzz.cnblogs.com/)
     */
    public class AvroProtocol extends AbstractProxyProtocol {
        public static final int DEFAULT_PORT = 40881;
        private static final Logger logger = LoggerFactory.getLogger(AvroProtocol.class);
    
        public int getDefaultPort() {
            return DEFAULT_PORT;
        }
    
        @Override
        protected <T> Runnable doExport(T impl, Class<T> type, URL url)
                throws RpcException {
    
            logger.info("impl => " + impl.getClass());
            logger.info("type => " + type.getName());
            logger.info("url => " + url);
    
            final Server server = new NettyServer(new ReflectResponder(type, impl),
                    new InetSocketAddress(url.getHost(), url.getPort()));
            server.start();
    
            return new Runnable() {
                public void run() {
                    try {
                        logger.info("Close Avro Server");
                        server.close();
                    } catch (Throwable e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            };
        }
    
        @Override
        protected <T> T doRefer(Class<T> type, URL url) throws RpcException {
    
            logger.info("type => " + type.getName());
            logger.info("url => " + url);
    
            try {
                NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(url.getHost(), url.getPort()));
                T ref = ReflectRequestor.getClient(type, client);
                logger.info("Create Avro Client");
                return ref;
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
            }
        }
    
    }
    

    不要忘记在META-INF/dubbo/internal下添加名为com.alibaba.dubbo.rpc.Protocal的文件,内容为:

    avro=com.alibaba.dubbo.rpc.protocol.avro.AvroProtocol
    

    接下来谈谈如何打包到dubbo的jar里:  

    dubbo-rpc/pom.xml里,把二个新增的项目加进来:

        <modules>
            ...
            <module>dubbo-rpc-avro</module>
            ...
            <module>dubbo-rpc-thrift2</module>
            ...
                
        </modules>

    然后dubbo/pom.xml里:

       <artifactSet>
           <includes>
        ...
               <include>com.alibaba:dubbo-rpc-api</include>
               <include>com.alibaba:dubbo-rpc-avro</include>
              ...
               <include>com.alibaba:dubbo-rpc-thrift2</include>
               ...
           </includes>
       </artifactSet>    

    dependencies节也要增加:

    <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>dubbo-rpc-thrift2</artifactId>
                <version>${project.parent.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.thrift</groupId>
                        <artifactId>libthrift</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>dubbo-rpc-avro</artifactId>
                <version>${project.parent.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.avro</groupId>
                        <artifactId>avro</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.avro</groupId>
                        <artifactId>avro-ipc</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    

    这样打包出来的dubbo-xxx.jar里,就包括新增的Protocol。至于google的protobuf,目前处于3.x -beta阶段,等以后出正式版了,再看情况整合起来。

    以上代码已经提交到github:https://github.com/yjmyzz/dubbox (版本号:2.8.4a

    thrift/avro协议的使用示例见:https://github.com/yjmyzz/dubbox-sample

    最后,对dubbo/thrift/avro/rest这4种协议,做了下简单的对比测试,测试用例很简单:

      public String ping() {
            return "pong";
        }
    

    客户端调用ping方法,服务器返回字符串"pong",在mac book pro上做5万次调用,结果如下:

    dubbo RPC testing => 
     50000次RPC调用(dubbo协议),共耗时14778毫秒,平均3383.407715/秒
    avro RPC testing => 
     50000次RPC调用(avro协议),共耗时10707毫秒,平均4669.842285/秒
    thrift RPC testing => 
     50000次RPC调用(thrift协议),共耗时4667毫秒,平均10713.520508/秒
    REST testing => 
     50000次REST调用,共耗时112699毫秒,平均443.659668/秒
    

    这跟预期一致,REST走http协议,自然最慢,avro与dubbo底层的网络通讯都是借助netty实现,在同一个数量级,但是avro的二进制序列化效率更高,所以略快,而thrift则是从里到外,全都是facebook自己实现的,性能最优,完胜其它协议。

    个人建议:对于一个服务接口,对外同时提供thrift、REST二种形式的服务实现,内部子系统之间用thrift方式调用(因为thrift跨语言,其实从外部进来的调用,也可以用thrift-rpc方式),一些不方便直接用thrift-client调用的场景,仍然走传统的REST.

  • 相关阅读:
    LeetCode(6. Z 字形变换)
    二分查找
    线性查找
    平安寿险Java面试-社招-四面(2019/08)
    希尔排序
    中移物联网Java面试-社招-三面(2019/07)
    插入排序
    选择排序
    冒泡排序
    八皇后问题
  • 原文地址:https://www.cnblogs.com/yjmyzz/p/dubbo-pritimive-thrift-avro-support.html
Copyright © 2011-2022 走看看