zoukankan      html  css  js  c++  java
  • 轻量级分布式RPC框架

    一个轻量级分布式RPC框架--NettyRpc

     

    1、背景

    最近在搜索Netty和Zookeeper方面的文章时,看到了这篇文章《轻量级分布式 RPC 框架》,作者用Zookeeper、Netty和Spring写了一个轻量级的分布式RPC框架。花了一些时间看了下他的代码,写的干净简单,写的RPC框架可以算是一个简易版的dubbo。这个RPC框架虽小,但是麻雀虽小,五脏俱全,有兴趣的可以学习一下。

    项目地址:https://github.com/luxiaoxun/NettyRpc

    自己花了点时间整理了下代码,并修改一些问题,以下是自己学习的一点小结。

    2、简介

    RPC,即 Remote Procedure Call(远程过程调用),调用远程计算机上的服务,就像调用本地服务一样。RPC可以很好的解耦系统,如WebService就是一种基于Http协议的RPC。

    这个RPC整体框架如下:

    这个RPC框架使用的一些技术所解决的问题:

    服务发布与订阅:服务端使用Zookeeper注册服务地址,客户端从Zookeeper获取可用的服务地址。

    通信:使用Netty作为通信框架。

    Spring:使用Spring配置服务,加载Bean,扫描注解。

    动态代理:客户端使用代理模式透明化服务调用。

    消息编解码:使用Protostuff序列化和反序列化消息。

    3、服务端发布服务

    使用注解标注要发布的服务

    服务注解

    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Component
    public @interface RpcService {
        Class<?> value();
    }

    一个服务接口:

    public interface HelloService {
    
        String hello(String name);
    
        String hello(Person person);
    }

    一个服务实现:使用注解标注

    复制代码
    @RpcService(HelloService.class)
    public class HelloServiceImpl implements HelloService {
    
        @Override
        public String hello(String name) {
            return "Hello! " + name;
        }
    
        @Override
        public String hello(Person person) {
            return "Hello! " + person.getFirstName() + " " + person.getLastName();
        }
    }
    复制代码

    服务在启动的时候扫描得到所有的服务接口及其实现:

    复制代码
    @Override
        public void setApplicationContext(ApplicationContext ctx) throws BeansException {
            Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class);
            if (MapUtils.isNotEmpty(serviceBeanMap)) {
                for (Object serviceBean : serviceBeanMap.values()) {
                    String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();
                    handlerMap.put(interfaceName, serviceBean);
                }
            }
        }
    复制代码

    在Zookeeper集群上注册服务地址:

     ServiceRegistry

    这里在原文的基础上加了AddRootNode()判断服务父节点是否存在,如果不存在则添加一个PERSISTENT的服务父节点,这样虽然启动服务时多了点判断,但是不需要手动命令添加服务父节点了。

    关于Zookeeper的使用原理,可以看这里《ZooKeeper基本原理》。

    4、客户端调用服务

    使用代理模式调用服务:

    复制代码
    public class RpcProxy {
    
        private String serverAddress;
        private ServiceDiscovery serviceDiscovery;
    
        public RpcProxy(String serverAddress) {
            this.serverAddress = serverAddress;
        }
    
        public RpcProxy(ServiceDiscovery serviceDiscovery) {
            this.serviceDiscovery = serviceDiscovery;
        }
    
        @SuppressWarnings("unchecked")
        public <T> T create(Class<?> interfaceClass) {
            return (T) Proxy.newProxyInstance(
                    interfaceClass.getClassLoader(),
                    new Class<?>[]{interfaceClass},
                    new InvocationHandler() {
                        @Override
                        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                            RpcRequest request = new RpcRequest();
                            request.setRequestId(UUID.randomUUID().toString());
                            request.setClassName(method.getDeclaringClass().getName());
                            request.setMethodName(method.getName());
                            request.setParameterTypes(method.getParameterTypes());
                            request.setParameters(args);
    
                            if (serviceDiscovery != null) {
                                serverAddress = serviceDiscovery.discover();
                            }
                            if(serverAddress != null){
                                String[] array = serverAddress.split(":");
                                String host = array[0];
                                int port = Integer.parseInt(array[1]);
    
                                RpcClient client = new RpcClient(host, port);
                                RpcResponse response = client.send(request);
    
                                if (response.isError()) {
                                    throw new RuntimeException("Response error.",new Throwable(response.getError()));
                                } else {
                                    return response.getResult();
                                }
                            }
                            else{
                                throw new RuntimeException("No server address found!");
                            }
                        }
                    }
            );
        }
    }
    复制代码

    这里每次使用代理远程调用服务,从Zookeeper上获取可用的服务地址,通过RpcClient send一个Request,等待该Request的Response返回。这里原文有个比较严重的bug,在原文给出的简单的Test中是很难测出来的,原文使用了obj的wait和notifyAll来等待Response返回,会出现“假死等待”的情况:一个Request发送出去后,在obj.wait()调用之前可能Response就返回了,这时候在channelRead0里已经拿到了Response并且obj.notifyAll()已经在obj.wait()之前调用了,这时候send后再obj.wait()就出现了假死等待,客户端就一直等待在这里。使用CountDownLatch可以解决这个问题。

    注意:这里每次调用的send时候才去和服务端建立连接,使用的是短连接,这种短连接在高并发时会有连接数问题,也会影响性能。

    从Zookeeper上获取服务地址:

     ServiceDiscovery

    每次服务地址节点发生变化,都需要再次watchNode,获取新的服务地址列表。

    5、消息编码

    请求消息:

     RpcRequest

    响应消息:

     RpcResponse

    消息序列化和反序列化工具:(基于 Protostuff 实现)

     SerializationUtil

    由于处理的是TCP消息,本人加了TCP的粘包处理Handler

    channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,0))

    消息编解码时开始4个字节表示消息的长度,也就是消息编码的时候,先写消息的长度,再写消息。

    6、性能改进

    Netty本身就是一个高性能的网络框架,从网络IO方面来说并没有太大的问题。

    从这个RPC框架本身来说,在原文的基础上把Server端处理请求的过程改成了多线程异步:

    复制代码
     public void channelRead0(final ChannelHandlerContext ctx,final RpcRequest request) throws Exception {
            RpcServer.submit(new Runnable() {
                @Override
                public void run() {
                    LOGGER.debug("Receive request " + request.getRequestId());
                    RpcResponse response = new RpcResponse();
                    response.setRequestId(request.getRequestId());
                    try {
                        Object result = handle(request);
                        response.setResult(result);
                    } catch (Throwable t) {
                        response.setError(t.toString());
                        LOGGER.error("RPC Server handle request error",t);
                    }
                    ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture channelFuture) throws Exception {
                            LOGGER.debug("Send response for request " + request.getRequestId());
                        }
                    });
                }
            });
        }
    复制代码

    Netty 4中的Handler处理在IO线程中,如果Handler处理中有耗时的操作(如数据库相关),会让IO线程等待,影响性能。

    个人觉得该RPC的待改进项:

    1)客户端保持和服务进行长连接,不需要每次调用服务的时候进行连接,长连接的管理(通过Zookeeper获取有效的地址)。

    2)客户端请求异步处理的支持,不需要同步等待:发送一个异步请求,返回Feature,通过Feature的callback机制获取结果。

    3)编码序列化的多协议支持。

    有时间再改改吧。。

    项目地址:https://github.com/luxiaoxun/NettyRpc

    参考:

    轻量级分布式 RPC 框架:http://my.oschina.net/huangyong/blog/361751

    你应该知道的RPC原理:http://www.cnblogs.com/LBSer/p/4853234.html

     

  • 相关阅读:
    【纯水题】POJ 1852 Ants
    【树形DP】BZOJ 1131 Sta
    【不知道怎么分类】HDU
    【树形DP】CF 1293E Xenon's Attack on the Gangs
    【贪心算法】CF Emergency Evacuation
    【思维】UVA 11300 Spreading the Wealth
    【树形DP】NOI2003 逃学的小孩
    【树形DP】BZOJ 3829 Farmcraft
    【树形DP】JSOI BZOJ4472 salesman
    【迷宫问题】CodeForces 1292A A NEKO's Maze Game
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/5278845.html
Copyright © 2011-2022 走看看