zoukankan      html  css  js  c++  java
  • 一个轻量级分布式RPC框架--NettyRpc

    1、背景

    最近在搜索Netty和Zookeeper方面的文章时,看到了这篇文章《轻量级分布式 RPC 框架》,作者用Zookeeper、Netty和Spring写了一个轻量级的分布式RPC框架。花了一些时间看了下他的代码,写的干净简单,写的RPC框架可以算是一个简易版的dubbo。这个RPC框架虽小,但是麻雀虽小,五脏俱全,有兴趣的可以学习一下。

    本人在这个简易版的RPC上添加了如下特性:

    * 服务异步调用的支持,回调函数callback的支持

    * 客户端使用长连接(在多次调用共享连接)

    * 服务端异步多线程处理RPC请求

    项目地址:https://github.com/luxiaoxun/NettyRpc

    2、简介

    RPC,即 Remote Procedure Call(远程过程调用),调用远程计算机上的服务,就像调用本地服务一样。RPC可以很好的解耦系统,如WebService就是一种基于Http协议的RPC。

    这个RPC整体框架如下:

    这个RPC框架使用的一些技术所解决的问题:

    服务发布与订阅:服务端使用Zookeeper注册服务地址,客户端从Zookeeper获取可用的服务地址。

    通信:使用Netty作为通信框架。

    Spring:使用Spring配置服务,加载Bean,扫描注解。

    动态代理:客户端使用代理模式透明化服务调用。

    消息编解码:使用Protostuff序列化和反序列化消息。

    3、服务端发布服务

    使用注解标注要发布的服务

    服务注解

    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Component
    public @interface RpcService {
        Class<?> value();
    }

    一个服务接口:

    public interface HelloService {
    
        String hello(String name);
    
        String hello(Person person);
    }

    一个服务实现:使用注解标注

    @RpcService(HelloService.class)
    public class HelloServiceImpl implements HelloService {
    
        @Override
        public String hello(String name) {
            return "Hello! " + name;
        }
    
        @Override
        public String hello(Person person) {
            return "Hello! " + person.getFirstName() + " " + person.getLastName();
        }
    }

    服务在启动的时候扫描得到所有的服务接口及其实现:

    @Override
        public void setApplicationContext(ApplicationContext ctx) throws BeansException {
            Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class);
            if (MapUtils.isNotEmpty(serviceBeanMap)) {
                for (Object serviceBean : serviceBeanMap.values()) {
                    String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();
                    handlerMap.put(interfaceName, serviceBean);
                }
            }
        }

    在Zookeeper集群上注册服务地址:

    public class ServiceRegistry {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);
    
        private CountDownLatch latch = new CountDownLatch(1);
    
        private String registryAddress;
    
        public ServiceRegistry(String registryAddress) {
            this.registryAddress = registryAddress;
        }
    
        public void register(String data) {
            if (data != null) {
                ZooKeeper zk = connectServer();
                if (zk != null) {
                    AddRootNode(zk); // Add root node if not exist
                    createNode(zk, data);
                }
            }
        }
    
        private ZooKeeper connectServer() {
            ZooKeeper zk = null;
            try {
                zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        if (event.getState() == Event.KeeperState.SyncConnected) {
                            latch.countDown();
                        }
                    }
                });
                latch.await();
            } catch (IOException e) {
                LOGGER.error("", e);
            }
            catch (InterruptedException ex){
                LOGGER.error("", ex);
            }
            return zk;
        }
    
        private void AddRootNode(ZooKeeper zk){
            try {
                Stat s = zk.exists(Constant.ZK_REGISTRY_PATH, false);
                if (s == null) {
                    zk.create(Constant.ZK_REGISTRY_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
            } catch (KeeperException e) {
                LOGGER.error(e.toString());
            } catch (InterruptedException e) {
                LOGGER.error(e.toString());
            }
        }
    
        private void createNode(ZooKeeper zk, String data) {
            try {
                byte[] bytes = data.getBytes();
                String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                LOGGER.debug("create zookeeper node ({} => {})", path, data);
            } catch (KeeperException e) {
                LOGGER.error("", e);
            }
            catch (InterruptedException ex){
                LOGGER.error("", ex);
            }
        }
    }
    ServiceRegistry

    这里在原文的基础上加了AddRootNode()判断服务父节点是否存在,如果不存在则添加一个PERSISTENT的服务父节点,这样虽然启动服务时多了点判断,但是不需要手动命令添加服务父节点了。

    关于Zookeeper的使用原理,可以看这里《ZooKeeper基本原理》。

    4、客户端调用服务

    使用代理模式调用服务:

    public class RpcProxy {
    
        private String serverAddress;
        private ServiceDiscovery serviceDiscovery;
    
        public RpcProxy(String serverAddress) {
            this.serverAddress = serverAddress;
        }
    
        public RpcProxy(ServiceDiscovery serviceDiscovery) {
            this.serviceDiscovery = serviceDiscovery;
        }
    
        @SuppressWarnings("unchecked")
        public <T> T create(Class<?> interfaceClass) {
            return (T) Proxy.newProxyInstance(
                    interfaceClass.getClassLoader(),
                    new Class<?>[]{interfaceClass},
                    new InvocationHandler() {
                        @Override
                        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                            RpcRequest request = new RpcRequest();
                            request.setRequestId(UUID.randomUUID().toString());
                            request.setClassName(method.getDeclaringClass().getName());
                            request.setMethodName(method.getName());
                            request.setParameterTypes(method.getParameterTypes());
                            request.setParameters(args);
    
                            if (serviceDiscovery != null) {
                                serverAddress = serviceDiscovery.discover();
                            }
                            if(serverAddress != null){
                                String[] array = serverAddress.split(":");
                                String host = array[0];
                                int port = Integer.parseInt(array[1]);
    
                                RpcClient client = new RpcClient(host, port);
                                RpcResponse response = client.send(request);
    
                                if (response.isError()) {
                                    throw new RuntimeException("Response error.",new Throwable(response.getError()));
                                } else {
                                    return response.getResult();
                                }
                            }
                            else{
                                throw new RuntimeException("No server address found!");
                            }
                        }
                    }
            );
        }
    }

    这里每次使用代理远程调用服务,从Zookeeper上获取可用的服务地址,通过RpcClient send一个Request,等待该Request的Response返回。这里原文有个比较严重的bug,在原文给出的简单的Test中是很难测出来的,原文使用了obj的wait和notifyAll来等待Response返回,会出现“假死等待”的情况:一个Request发送出去后,在obj.wait()调用之前可能Response就返回了,这时候在channelRead0里已经拿到了Response并且obj.notifyAll()已经在obj.wait()之前调用了,这时候send后再obj.wait()就出现了假死等待,客户端就一直等待在这里。使用CountDownLatch可以解决这个问题。

    注意:这里每次调用的send时候才去和服务端建立连接,使用的是短连接,这种短连接在高并发时会有连接数问题,也会影响性能。

    从Zookeeper上获取服务地址:

    public class ServiceDiscovery {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);
    
        private CountDownLatch latch = new CountDownLatch(1);
    
        private volatile List<String> dataList = new ArrayList<>();
    
        private String registryAddress;
    
        public ServiceDiscovery(String registryAddress) {
            this.registryAddress = registryAddress;
            ZooKeeper zk = connectServer();
            if (zk != null) {
                watchNode(zk);
            }
        }
    
        public String discover() {
            String data = null;
            int size = dataList.size();
            if (size > 0) {
                if (size == 1) {
                    data = dataList.get(0);
                    LOGGER.debug("using only data: {}", data);
                } else {
                    data = dataList.get(ThreadLocalRandom.current().nextInt(size));
                    LOGGER.debug("using random data: {}", data);
                }
            }
            return data;
        }
    
        private ZooKeeper connectServer() {
            ZooKeeper zk = null;
            try {
                zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        if (event.getState() == Event.KeeperState.SyncConnected) {
                            latch.countDown();
                        }
                    }
                });
                latch.await();
            } catch (IOException | InterruptedException e) {
                LOGGER.error("", e);
            }
            return zk;
        }
    
        private void watchNode(final ZooKeeper zk) {
            try {
                List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        if (event.getType() == Event.EventType.NodeChildrenChanged) {
                            watchNode(zk);
                        }
                    }
                });
                List<String> dataList = new ArrayList<>();
                for (String node : nodeList) {
                    byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);
                    dataList.add(new String(bytes));
                }
                LOGGER.debug("node data: {}", dataList);
                this.dataList = dataList;
            } catch (KeeperException | InterruptedException e) {
                LOGGER.error("", e);
            }
        }
    }
    ServiceDiscovery

    每次服务地址节点发生变化,都需要再次watchNode,获取新的服务地址列表。

    5、消息编码

    请求消息:

    public class RpcRequest {
    
        private String requestId;
        private String className;
        private String methodName;
        private Class<?>[] parameterTypes;
        private Object[] parameters;
    
        public String getRequestId() {
            return requestId;
        }
    
        public void setRequestId(String requestId) {
            this.requestId = requestId;
        }
    
        public String getClassName() {
            return className;
        }
    
        public void setClassName(String className) {
            this.className = className;
        }
    
        public String getMethodName() {
            return methodName;
        }
    
        public void setMethodName(String methodName) {
            this.methodName = methodName;
        }
    
        public Class<?>[] getParameterTypes() {
            return parameterTypes;
        }
    
        public void setParameterTypes(Class<?>[] parameterTypes) {
            this.parameterTypes = parameterTypes;
        }
    
        public Object[] getParameters() {
            return parameters;
        }
    
        public void setParameters(Object[] parameters) {
            this.parameters = parameters;
        }
    }
    RpcRequest

    响应消息:

    public class RpcResponse {
    
        private String requestId;
        private String error;
        private Object result;
    
        public boolean isError() {
            return error != null;
        }
    
        public String getRequestId() {
            return requestId;
        }
    
        public void setRequestId(String requestId) {
            this.requestId = requestId;
        }
    
        public String getError() {
            return error;
        }
    
        public void setError(String error) {
            this.error = error;
        }
    
        public Object getResult() {
            return result;
        }
    
        public void setResult(Object result) {
            this.result = result;
        }
    }
    RpcResponse

    消息序列化和反序列化工具:(基于 Protostuff 实现)

    public class SerializationUtil {
    
        private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();
    
        private static Objenesis objenesis = new ObjenesisStd(true);
    
        private SerializationUtil() {
        }
    
        @SuppressWarnings("unchecked")
        private static <T> Schema<T> getSchema(Class<T> cls) {
            Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
            if (schema == null) {
                schema = RuntimeSchema.createFrom(cls);
                if (schema != null) {
                    cachedSchema.put(cls, schema);
                }
            }
            return schema;
        }
    
        /**
         * 序列化(对象 -> 字节数组)
         */
        @SuppressWarnings("unchecked")
        public static <T> byte[] serialize(T obj) {
            Class<T> cls = (Class<T>) obj.getClass();
            LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
            try {
                Schema<T> schema = getSchema(cls);
                return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
            } catch (Exception e) {
                throw new IllegalStateException(e.getMessage(), e);
            } finally {
                buffer.clear();
            }
        }
    
        /**
         * 反序列化(字节数组 -> 对象)
         */
        public static <T> T deserialize(byte[] data, Class<T> cls) {
            try {
                T message = (T) objenesis.newInstance(cls);
                Schema<T> schema = getSchema(cls);
                ProtostuffIOUtil.mergeFrom(data, message, schema);
                return message;
            } catch (Exception e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
    }
    SerializationUtil

    由于处理的是TCP消息,本人加了TCP的粘包处理Handler

    channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,0))

    消息编解码时开始4个字节表示消息的长度,也就是消息编码的时候,先写消息的长度,再写消息。

    6、性能改进

    1)服务端请求异步处理

    Netty本身就是一个高性能的网络框架,从网络IO方面来说并没有太大的问题。

    从这个RPC框架本身来说,在原文的基础上把Server端处理请求的过程改成了多线程异步:

     public void channelRead0(final ChannelHandlerContext ctx,final RpcRequest request) throws Exception {
            RpcServer.submit(new Runnable() {
                @Override
                public void run() {
                    LOGGER.debug("Receive request " + request.getRequestId());
                    RpcResponse response = new RpcResponse();
                    response.setRequestId(request.getRequestId());
                    try {
                        Object result = handle(request);
                        response.setResult(result);
                    } catch (Throwable t) {
                        response.setError(t.toString());
                        LOGGER.error("RPC Server handle request error",t);
                    }
                    ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture channelFuture) throws Exception {
                            LOGGER.debug("Send response for request " + request.getRequestId());
                        }
                    });
                }
            });
        }

    Netty 4中的Handler处理在IO线程中,如果Handler处理中有耗时的操作(如数据库相关),会让IO线程等待,影响性能。

    2)服务端长连接的管理

     客户端保持和服务进行长连接,不需要每次调用服务的时候进行连接,长连接的管理(通过Zookeeper获取有效的地址)。

    通过监听Zookeeper服务节点值的变化,动态更新客户端和服务端保持的长连接。这个事情现在放在客户端在做,客户端保持了和所有可用服务的长连接,给客户端和服务端都造成了压力,需要解耦这个实现。

    3)客户端请求异步处理

    客户端请求异步处理的支持,不需要同步等待:发送一个异步请求,返回Feature,通过Feature的callback机制获取结果。

    IAsyncObjectProxy client = rpcClient.createAsync(HelloService.class);
    RPCFuture helloFuture = client.call("hello", Integer.toString(i));
    String result = (String) helloFuture.get(3000, TimeUnit.MILLISECONDS);

    个人觉得该RPC的待改进项:

    * 编码序列化的多协议支持。

    项目持续更新中。

    项目地址:https://github.com/luxiaoxun/NettyRpc

    参考:

    轻量级分布式 RPC 框架:http://my.oschina.net/huangyong/blog/361751

    你应该知道的RPC原理:http://www.cnblogs.com/LBSer/p/4853234.html

     

  • 相关阅读:
    修理牛棚 贪心 USACO
    零件加工 贪心 题解
    花店橱窗 动态规划 题解
    动态规划 摆花 题解
    NOIP2004普及组第3题 FBI树
    实况世界杯4小游戏链接
    poj2761(treap入门)
    最大连续子序列和(分治法)
    任意区间的最长连续递增子序列,最大连续子序列和
    lca转RMQ
  • 原文地址:https://www.cnblogs.com/luxiaoxun/p/5272384.html
Copyright © 2011-2022 走看看