zoukankan      html  css  js  c++  java
  • SpringBoot2+Netty打造通俗简版RPC通信框架

    2019-07-19:完成基本RPC通信!

    2019-07-22:优化此框架,实现单一长连接!

    2019-07-24:继续优化此框架:1、增加服务提供注解(带版本号),然后利用Spring框架的在启动时立刻保存提供服务的实现类。2、优化NettyConfig(区分消费者和提供者配置),因为一个项目可同时作为服务提供者和服务消费者,所以增加两个配置来区分是提供服务还是消费服务,而且,因为如果都是本地启动着两个项目,那么IP必定是一样的,所以需要区分服务端口和消费端口。不然会有下面事故:先启动client,再启动server,但是他们同样依赖于netty包,所以client也启动了netty服务,只配置一个相同的端口会导致client的RPC通信也是通道自己启动的Netty服务。。。

    2019-07-27:优化此框架:增加注册中心,使用Zookeeper作为注册中心。

    接下来:我会优化Netty方面的,例如增加心跳检测、业务处理统一使用自定义业务线程池、客户端或服务端异常断开处理等,然后会优化一下项目的结构和rpc通信返回结果等,最后可能会考虑增加Redis作为注册中心。等完成所有的这些,就会对整个项目重新写一篇文章来介绍一下自己的整体思路,当然了,如果有同学需要的,可以在下方留言,我可以提前写文章,对完成注册中心及之前的代码进行详细介绍,之后再补充其他新增的功能实现过程!~

    2019-07-30:已完成全部功能。放上连接:完整版RPC通信框架

    下面的是2019-07-19写的文章,所以代码是没经过优化的,不过是核心代码,还是需要阅读一下的,需要看完整代码的请到最下面的github地址,大家可根据标签拉到对应的代码,麻烦啦然后还有,测试方法是HelloController的sayHello方法呢,也可以自己再捣鼓一些测试一下

    ​​​​

    前段时间,我花了两个星期的时间去重新学习Netty,因为之前总是看过一会就没看了,所以今次下定决心一定要全部看完,然后也思考做了一些的思考题,并且将简单的控制台版IM系统做出来了。虽然叫IM系统,但是是很简陋的,哈哈,只有登录、单聊、建群、加群、退群、群聊等简单的功能。大家可以到我github上看看:Netty-IM

    写完这个IM系统后,我是打算自己写一个网页版的,可是考虑到自己前端的技能好像都退化得差不多了,而且时间上可能没那么充裕,就不了了之了。然后有一天,突然想起来之前使用的RPC框架->Dubbo,他的通信底层就是使用Netty,那么我就想着要不自己先搞个简单版试试呗,因为最主要的是学习技能得实践一番,不然学了好像没学一样。。。

    在开始动手前,自己屡了一下思路,也参考了两篇文章,决定先做一个简版的RPC框架,不带注册中心的那种。那么来了老弟,首先我们看一下整个流程图是咋样的:

    接下来重头戏来了,下面将会较详细得说一下流程:

    先简单介绍一下项目结构:

    simple-rpc-client:服务消费

    simple-rpc-server:服务提供

    simple-rpc-encapsulation:消费者和提供者公共接口

    simple-rpc-netty:是关于Netty的东西,包括:自定义协议,序列化,通信实体Packet,各种Handler等等。

    客户端:
            1、首先是两个注解,一个注解是:标识那些接口的调用会进行RPC通信,即@NettyRPC注解。
            另外一个注解是:告诉程序哪些包下的类会使用RPC通信,像@ComponentScan一样,即@EnableNettyRPC注解。

    /**
     * @author Howinfun
     */
    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.TYPE)
    public @interface NettyRPC {
    }
    /**
     * @author Howinfun
     * @desc
     * @date 2019/7/15
     */
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface EnableNettyRPC {
        //扫描的包名,如果为空,则根据启动类所在的包名扫描
        String[] basePackages() default {};
    }
    

    2、因为我们使用@NettyRPC的将是一些接口,如果项目里头没有实现类,那是调用失败的,那么我们可以通过实现ImportBeanDefinitionRegistrar和自定义FactoryBean和InvocationHandler,利用动态代理使接口有实现,并且能动态注入Bean。ImportBeanDefinitionRegistrar接口可以详细说一下,因为这里是动态注入Bean,怎么注入规则是可以自定的,主要是靠ClassPathScanningCandidateComponentProvider这个类,它主要功能是扫描ClassPath下的所有类,并且根据isCandidateComponent方法来判断哪些类可以作为候选人,当然了,isCandidateComponent方法你可以重写,然后加上你自己的规则,我这里是必须是独立的并且是接口,才能成为候选人。然后ClassPathScanningCandidateComponentProvider还能添加过滤器,我这里主要添加的过滤器是注解过滤器,只要带有@NettyRPC注解的,其他的都不要。
        不过需要注意一点的是:记得在有@Configuration注解的配置类上使用@Import导入实现ImportBeanDefinitionRegistrar的类,不然实现动态注入Bean的作用,这里我们在客户端的启动类Import即可。

    package com.hyf.rpc.netty.client.config;
    
    import com.hyf.rpc.netty.anno.EnableNettyRPC;
    import com.hyf.rpc.netty.anno.NettyRPC;
    import org.springframework.beans.factory.BeanClassLoaderAware;
    import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
    import org.springframework.beans.factory.config.BeanDefinition;
    import org.springframework.beans.factory.config.BeanDefinitionHolder;
    import org.springframework.beans.factory.support.AbstractBeanDefinition;
    import org.springframework.beans.factory.support.BeanDefinitionBuilder;
    import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
    import org.springframework.beans.factory.support.BeanDefinitionRegistry;
    import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
    import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
    import org.springframework.core.type.AnnotationMetadata;
    import org.springframework.core.type.filter.AnnotationTypeFilter;
    import org.springframework.util.Assert;
    import org.springframework.util.ClassUtils;
    import org.springframework.util.StringUtils;
    
    import java.util.HashSet;
    import java.util.Map;
    import java.util.Set;
    
    
    /**
     * 自定义注册带@NettyRPC注解的接口,利用动态代理使接口有实现
     * 然后在有@Configuration注解的配置类上使用@Import导入,不然不能注入这些实现@NettyRPC接口的BeanDefinition
     * @author Howinfun
     * @date 2019-07-18
     */
    public class NettyRpcClientRegistrar implements ImportBeanDefinitionRegistrar, BeanClassLoaderAware {
    
        private ClassLoader classLoader;
    
        @Override
        public void setBeanClassLoader(ClassLoader classLoader) {
            this.classLoader = classLoader;
        }
    
        @Override
        public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
    
            ClassPathScanningCandidateComponentProvider scan = getScanner();
    
            //指定注解,类似于Feign注解,只扫描带@NettyRPC注解的接口
            scan.addIncludeFilter(new AnnotationTypeFilter(NettyRPC.class));
    
            Set<BeanDefinition> candidateComponents = new HashSet<>();
            for (String basePackage : getBasePackages(importingClassMetadata)) {
                candidateComponents.addAll(scan.findCandidateComponents(basePackage));
            }
            candidateComponents.stream().forEach(beanDefinition -> {
                if (!registry.containsBeanDefinition(beanDefinition.getBeanClassName())) {
                    if (beanDefinition instanceof AnnotatedBeanDefinition) {
                        AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) beanDefinition;
                        AnnotationMetadata annotationMetadata = annotatedBeanDefinition.getMetadata();
                        Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(NettyRPC.class.getCanonicalName());
    
                        this.registerNettyRpcClient(registry, annotationMetadata,attributes);
                    }
                }
            });
        }
    
        private void registerNettyRpcClient(BeanDefinitionRegistry registry,
                                            AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
            String className = annotationMetadata.getClassName();
            // 指定工厂,使用@NettyRPC注解的接口,当代码中注入时,是从指定工厂获取,而这里的工厂返回的是代理
            BeanDefinitionBuilder definition = BeanDefinitionBuilder
                    .genericBeanDefinition(NettyClientFactoryBean.class);
            // @Autowrie:根据类型注入
            definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
            // 注定type属性
            definition.addPropertyValue("type", className);
            String name = attributes.get("name") == null ? "" :(String)(attributes.get("name"));
            // 别名
            String alias = name + "NettyRpcClient";
            AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
            beanDefinition.setPrimary(true);
            BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
                    new String[] { alias });
            // 注册BeanDefinition
            BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
        }
    
    
    
        protected ClassPathScanningCandidateComponentProvider getScanner() {
            return new ClassPathScanningCandidateComponentProvider(false) {
                @Override
                protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
                    // 判断候选人的条件:必须是独立的,然后是接口
                    if (beanDefinition.getMetadata().isIndependent() && beanDefinition.getMetadata().isInterface()){
                        return true;
                    }
                    return false;
                }
            };
        }
    
        /**
         * 获取指定扫描@NettyRPC注解的包路径
         * @param importingClassMetadata
         * @return
         */
        protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) {
            Map<String, Object> attributes = importingClassMetadata
                    .getAnnotationAttributes(EnableNettyRPC.class.getCanonicalName());
    
            Set<String> basePackages = new HashSet<>();
            // 如果指定的包路径为空,则获取启动类当前路径
            if (basePackages.isEmpty()) {
                basePackages.add(
                        ClassUtils.getPackageName(importingClassMetadata.getClassName()));
            }else{
                for (String pkg : (String[]) attributes.get("basePackages")) {
                    if (StringUtils.hasText(pkg)) {
                        basePackages.add(pkg);
                    }
                }
            }
            return basePackages;
        }
    }
    
    package com.hyf.rpc.netty.client.config;
    
    import lombok.Data;
    import lombok.EqualsAndHashCode;
    import org.springframework.beans.factory.FactoryBean;
    import org.springframework.stereotype.Component;
    
    import java.lang.reflect.Proxy;
    
    /**
     * @author Howinfun
     * @desc
     * @date 2019/7/15
     */
    @Data
    @EqualsAndHashCode(callSuper = false)
    @Component
    public class NettyClientFactoryBean implements FactoryBean<Object> {
    
        private Class<?> type;
    
        @Override
        public Object getObject() throws Exception {
            // 这里的interfaces注意是就是type,因为我们现在是给接口做代理,千万别写type.getInterfaces(),不然启动会报错
            return Proxy.newProxyInstance(type.getClassLoader(),new Class[]{type},new NettyRPCInvocationHandler(this.type));
        }
    
        @Override
        public Class<?> getObjectType() {
            return this.type;
        }
    }
    

    3、在动态代理的invoke方法里头,我们将启动Netty的一个客户端,带上接口调用的信息,然后等待Netty服务端返回结果结果再返回到前端即可。

    package com.hyf.rpc.netty.client.config;
    
    import com.hyf.rpc.netty.client.NettyClient;
    import com.hyf.rpc.netty.packet.RPCRequestPacket;
    import lombok.NoArgsConstructor;
    import org.springframework.stereotype.Component;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    
    /**
     * @author Howinfun
     * @desc
     * @date 2019/7/15
     */
    @NoArgsConstructor
    @Component
    public class NettyRPCInvocationHandler implements InvocationHandler {
    
        private Class<?> type;
    
        public NettyRPCInvocationHandler(Class<?> type){
            this.type = type;
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            RPCRequestPacket requestPacket = new RPCRequestPacket();
            requestPacket.setClazz(type);
            requestPacket.setMethodName(method.getName());
            requestPacket.setParamTypes(method.getParameterTypes());
            requestPacket.setParams(args);
            Object result = NettyClient.callRPC(requestPacket);
            return result;
        }
    }
    

    有一个坑是:当客户端接收到服务端的返回结果后,记得关闭通道[ctx.channel().close()],因为在客户端中RPC调用后是同步等待Channel关闭的,不然不能响应给前端。

    服务端:服务端的流程稍微会简单很多
            1、启动Netty服务端服务,然后接收客户端的链接请求,解析请求
            2、然后根据接口调用信息,利用反射获取到实现类和对应的方法,最后调用方法得到结果,然后封装一下结果就可以相应给客户端了。

    package com.hyf.rpc.netty.server.handler;
    
    import com.hyf.rpc.netty.packet.RPCRequestPacket;
    import com.hyf.rpc.netty.packet.RPCResponsePacket;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import org.reflections.Reflections;
    
    import java.lang.reflect.Method;
    import java.util.Set;
    
    /**
     * @author Howinfun
     * @desc
     * @date 2019/7/16
     */
    @ChannelHandler.Sharable
    public class RPCRequestPacketHandler extends SimpleChannelInboundHandler<RPCRequestPacket> {
    
        public static final RPCRequestPacketHandler INSTANCE = new RPCRequestPacketHandler();
        private RPCRequestPacketHandler(){}
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RPCRequestPacket msg) throws Exception {
            RPCResponsePacket responsePacket = new RPCResponsePacket();
            // 获取rpc调用信息,利用反射执行方法,返回结果
            Class clazz = msg.getClazz();
            String methodName = msg.getMethodName();
            Object[] params = msg.getParams();
            Class[] paramTypes = msg.getParamTypes();
            // 扫面路径下所有元数据
            Reflections reflections = new Reflections("com.hyf.rpc.serviceImpl");
            Set<Class> subTypes = reflections.getSubTypesOf(clazz);
            if (subTypes.isEmpty()){
                responsePacket.setSuccess(false);
                responsePacket.setMsg("没有实现类");
            }else if (subTypes.size() > 1){
                responsePacket.setSuccess(false);
                responsePacket.setMsg("多个实现类,无法判断执行哪一个");
            }else{
                Class subClass = subTypes.toArray(new Class[1])[0];
                Method method = subClass.getMethod(methodName,paramTypes);
                Object result = method.invoke(subClass.newInstance(),params);
                responsePacket.setSuccess(true);
                responsePacket.setResult(result);
            }
            ctx.channel().writeAndFlush(responsePacket);
        }
    }
    

    3、这里的反射我推荐一个很好用的框架->Reflections。简单介绍一下我使用了哪些API,首先是根据路径扫描反射元数据,
        然后根据接口获取它的所有实现类,然后就可以获取实现类的反射信息,得到方法执行结果了。

    如果同学们对此比较简陋的代码还略感兴趣,可以到我的码云上看看:Netty-RPC

  • 相关阅读:
    CSS选择器规范
    利用form的“acceptcharset”在不同编码的页面间提交表单
    学习Ruby的基础知识
    Watir和watir webdriver的区别
    PHP in_array() 函数
    Ruby数组的基础知识
    PHP smarty if的条件修饰词
    很好的自动化学习资料 Ruby watir selenium
    $(document).ready() 和window.onload
    收藏:简单的PHP+SMARTY分页类
  • 原文地址:https://www.cnblogs.com/Howinfun/p/11612494.html
Copyright © 2011-2022 走看看