zoukankan      html  css  js  c++  java
  • 这样基于Netty重构RPC框架你不可能知道

    原创申明:本文由公众号【猿灯塔】原创,转载请说明出处标注

    今天是猿灯塔“365天原创计划”第5天。

    今天呢!灯塔君跟大家讲:     

           基于Netty重构RPC框架

    一.CyclicBarrier方法说明

    1.单一应用架构

    当网站流量很小时,只需一个应用,将所有功能都部署在一起,以减少部署节点和成本。。(最开始58 同城的站点架构用一个词概括就是“ALL IN ONE”。就像一个单机系统,所有的东西都部署在一台机器 上,包括站点、数据库、文件等等。而工程师每天的核心工作就是CURD,前端传过来一些数据,然后 业务逻辑层拼装成一些CURD访问数据库,数据库返回数据,数据拼装成页面,最终返回到浏览器。此 时,用于简化增删改查工作量的数据访问框架(ORM)是关键)

    2、垂直应用架构

    当访问量逐渐增大,单一应用增加机器带来的加速度越来越小,将应用拆成互不相干的几个应用,以提 升效率。应用拆分为不相干的几个应用,前后端分离,此时用于加速前端页面开发的Web MVC框架是 关键

    3、分布式服务服务架构

    当垂直应用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服务,逐渐形成稳定 的服务中心,使前端应用能更快速的响应多变的市场需求。同时将公共能力API抽取出来,作为独立的 公共服务供其他调用者消费,以实现服务的共享和重用,降低开发和运维成本。应用拆分之后会按照模 块独立部署,接口调用由本地API演进成跨进程的远程方法调用,此时RPC框架应运而生。此时,用于 提高业务复用及整合的分布式服务框架(RPC)是关键。

    4、流动计算架构

    当服务越来越多,容量的评估,小服务资源的浪费等问题逐渐显现,此时需增加一个调度中心基于访问 压力实时管理集群容量,提高集群利用率。此时,用于提高机器利用率的资源调度和治理中心(SOA)是 关键。面向服务的架构(SOA)是一个组件模型,它将应用程序的不同功能单元(称为服务)进行拆 分,并通过这些服务之间定义良好的接口和协议联系起来。

    没有RPC框架之前,我们的服务调用是这样的:

    看出接口的调用完全没有规律可循,想怎么调,就怎么调。这导致业务发展到一定阶段之后,对接口的 维护变得非常困难。于是有人提出了服务治理的概念。所有服务间不允许直接调用,而是先到注册中心 进行登记,再由注册中心统一协调和管理所有服务的状态并对外发布,调用者只需要记住服务名称,去 找注册中心获取服务即可。这样,极大地规范了服务的管理,可以提高了所有服务端可控性。整个设计思想其实在我们生活中也能 找到活生生的案例。例如:我们平时工作交流,大多都是用IM 工具,而不是面对面吼。大家只需要相 互记住运营商(也就是注册中心)提供的号码(如:腾讯QQ)即可。再比如:我们打电话,所有电话 号码有运营商分配。我们需要和某一个人通话时,只需要拨通对方的号码,运营商(注册中心,如中国 移动、中国联通、中国电信)就会帮我们将信号转接过去。

    二.RPC介绍

    1、RPC简介:

    • Remote Procedure Call远程过程调用RPC就是从一台机器(客户端)上通过参数传递的        方式调用另一台机器(服务器)上的一个函数或 方法(可以统称为服务)并得到返回的结果。
    • RPC 会隐藏底层的通讯细节(不需要直接处理Socket通讯或Http通讯) RPC 是一个请求响应模 型。
    • 客户端发起请求,服务器返回响应(类似于Http的工作方式) RPC 在使用形式上像调用本地函数 (或方法)一样去调用远程的函数(或方法)。

     2、RPC通信原理

    RPC的主要作用有三方面:

    1、进程间通讯

    2、提供和本地方法调用一样的机制

    3、屏蔽用户对远程调用的细节实现

    RPC框架的好处首先就是长链接,不必每次通信都要像http一样去3次握手,减少了网络开销;其次就 是RPC框架一般都有注册中心,有丰富的监控管理;发布、下线接口、动态扩展等,对调用方来说是无 感知,统一化的操作。

    3、RPC通过过程

    1)服务消费方(client)调用以本地调用方式调用服务; 

    2)client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体; 

    3)client stub找到服务地址,并将消息发送到服务端; 

    4)server stub收到消息后进行解码; 

    5)server stub根据解码结果调用本地的服务;6)本地服务执行并将结果返回给server stub;7)server stub将返回结果打包成消息并发送至消费方; 

    8)client stub接收到消息,并进行解码; 

    9)服务消费方得到最终结果。

    4、常用的分布式RPC框架

    dubbo:国内最早开源的RPC框架,由阿里巴巴公司开发并于2011年末对外开源,仅支持java语言

    4、常用的分布式RPC框架

    • dubbo:国内最早开源的RPC框架,由阿里巴巴公司开发并于2011年末对外开源,仅支持java语言
    • motan:微博内部使用的RPC框架,于2016年对外开源,仅支持java语言
    • Thrift:轻量级的跨语言RPC通信方案,支持多大25种变成语言
    • gRPC:Google于2015年对外开源的跨语言PRC框架,支持常用的C++、java、Python、Go、 Ruby、PHP等多种语言。

    目前流行的RPC 服务治理框架主要有Dubbo 和Spring Cloud,

    下面我们以比较经典的Dubbo 为例。

    Dubbo 核心模块主要有:

    Registry:注册中心(主要负责保存所有可用的服务名称和服务地址。) 

    Provider:服务中心(实现对外提供的所有服务的具体功能) 

    Consumer:消费端(调用远程服务的服务消费方) 

    Monitor:监控中心(统计服务的调用次数和调用时间的监控中心) 

    Container:服务运行容器

    api:主要用来定义对外开放的功能与服务接口。protocol:主要定义自定义传输协议的内容

    蓝色方框代表业务有交互,绿色方框代表只对Dubbo内部交互。蓝色虚线为初始化时调用,红色虚线为 运行时异步调用,红色实线为运行时同步调用

    0、服务在容器中启动,加载,运行Provider

    1、Provider在启动时,向Registry注册自己提供的服务 

    2、Consumer在启动时,向Registry订阅自己所需的服务 

    3、Registry给Consumer返回Provider的地址列表,如果Provider地址有变更(上线/下线机器), Registry将基于长连接推动变更数据给Consumer 

    4、Consumer从Provider地址列表中,基于软负载均衡算法,选一台进行调用,如果失败,重试另一 台调用 

    5、Consumer和Provider,在内存中累计调用次数和时间,定时每分钟一次将统计数据发送到Monitor

    4.具体实现

    1、api

    package com.rpc.api; 

    /**

    * API模块,provider和Consumer都遵循API模块的

    规范 * 用来定义对外开放的功能与服务接口 

    */ 

    public interface IRpcHelloService { 

    String hello(String name); 

    }

    package com.rpc.api; 

    public interface IRpcService { 

    /*增加用户*/ 

    public String addUser(); 

    /*删除用户*/ 

    public String deleteUser(Integer id); 

    /*修改用户*/ 

    public String updateUser(Integer id); 

    /*查询用户*/ 

    public String queryUser(Integer id); 

    }

    2、Provider:服务中心

    package com.rpc.provider; 

    import com.rpc.api.IRpcService; 

    public class RpcServiceImpl implements 

    IRpcService { 

    @Override 

    public String addUser() { 

    return "增加用户"; 

    }

    @Override 

    public String deleteUser(Integer id) { 

    return "删除了编号为"+id+"的用户"; 

    }

    @Override 

    public String updateUser(Integer id) { 

    return "修改了编号为"+id+"的用户"; 

    }

    @Override 

    public String queryUser(Integer id) { 

    return "查询到了编号为"+id+"这个用户的信息"; 

    }

    package com.rpc.provider; 

    import com.rpc.api.IRpcHelloService; 

    /**

    * 服务中心,实现对外提供的所有服务的具体功能 

    */ 

    public class RpcHelloServiceImpl implements IRpcHelloService { 

    public String hello(String name) { 

    return "Hello " + name + "!"; 

    }

    3、protocol传输协议内容

    ackage com.rpc.protocol; 

    import java.io.Serializable; 

    /**

    * 自定义传输协议内容 

    */ 

    public class InvokerProtocol implements 

    Serializable { 

    private String className;//类名 

    private String methodName;//函数名称 

    private Class<?>[] parames;//形参列表

    private Object[] values;//实参列表 

    public String getClassName() { 

    return className; 

    }

    public void setClassName(String className)

     { 

    this.className = className; 

    }

    public String getMethodName() { 

    return methodName; 

    }

    public void setMethodName

    (String methodName)

     { 

    this.methodName = methodName; 

    }

    public Class<?>[] getParames() { 

    return parames; 

    }

    public void setParames(Class<?>[] parames) 

    this.parames = parames; 

    }

    public Object[] getValues() { 

    return values; 

    }

    public void setValues(Object[] values) { 

    this.values = values; 

    }

    4、Registry注册中心

    package com.rpc.registry; 

    import io.netty.bootstrap.ServerBootstrap; 

    import io.netty.channel.*; 

    import io.netty.channel.nio.NioEventLoopGroup; 

    import io.netty.channel.socket.SocketChannel; 

    importio.netty.channel.socket.nio.

    NioServerSocketChannel; 

    import io.netty.handler.codec.

    LengthFieldBasedFrameDecoder; 

    import io.netty.handler.codec.LengthFieldPrepender; 

    import io.netty.handler.codec.serialization.

    ClassResolvers; 

    import io.netty.handler.codec.serialization.

    ObjectDecoder; 

    import io.netty.handler.codec.serialization.

    ObjectEncoder; 

    /**

    * 注册中心主要功能就是负责将所有Provider的

    服务名称和服务引用地址注册到一个容器中, 

    * 并对外发布。启动一个对外的服务,

    并对外提供一个可访问的端口 

    * 主要负责保存所有可用的服务名称和服务地址 

    */ 

    public class RpcRegistry { 

    private int port;

    public RpcRegistry(int port){ 

    this.port = port; 

    }

    public void start(){ 

    EventLoopGroup bossGroup = new NioEventLoopGroup(); 

    EventLoopGroup workerGroup = new NioEventLoopGroup(); 

    try {

    ServerBootstrap b = new ServerBootstrap(); 

    b.group(bossGroup, workerGroup) 

    .channel(NioServerSocketChannel.class) 

    .childHandler(new ChannelInitializer<SocketChannel>() { 

    @Override 

    protected void initChannel(SocketChannel ch) throws 

    Exception { 

    ChannelPipeline pipeline = ch.pipeline(); 

    //自定义协议解码器 

    /** 入参有5个,分别解释如下 

    (1) maxFrameLength - 发送的数据包最大长度; 

    (2) lengthFieldOffset - 长度域偏移量,指的是长度域位于 

    整个数据包字节数组中的下标; 

    (3) lengthFieldLength - 长度域的自己的字节数长度。 

    (4) lengthAdjustment – 长度域的偏移量矫正。如果长度域 

    的值,除了包含有效数据域的长度外,

    还包含了其他域(如长度域自身)长度,那么,

    就需要进行矫正。矫 正的值为:包长 - 长度域的值 – 

    长度域偏移 – 长度域长。 

    (5) initialBytesToStrip – 丢弃的起始字节数。

    丢弃处于有 效数据前面的字节数量。

    比如前面有4个节点的长度域,则它的值为4。 

    */ 

    pipeline.addLast(new 

    LengthFieldBasedFrameDecoder(Integer.

    MAX_VALUE, 0, 4, 0, 4)); 

    //自定义协议编码器 

    pipeline.addLast(new LengthFieldPrepender(4)); 

    //对象参数类型编码器 

    pipeline.addLast("encoder",new ObjectEncoder()); 

    //对象参数类型解码器 

    pipeline.addLast("decoder",new 

    ObjectDecoder(Integer.MAX_VALUE, 

    ClassResolvers.cacheDisabled(null))); 

    pipeline.addLast(new RegistryHandler()); 

    })

    .option(ChannelOption.SO_BACKLOG, 128) 

    .childOption(ChannelOption.SO_KEEPALIVE, true); 

    ChannelFuture future = b.bind(port).sync(); 

    System.out.println("jiangym RPC Registry 

    start listen at " + port ); 

    future.channel().closeFuture().sync(); 

    } catch (Exception e) { 

    bossGroup.shutdownGracefully(); 

    workerGroup.shutdownGracefully(); 

    }

    //主启动类 

    public static void main(String[] args) throws Exception { 

    new RpcRegistry(8888).start(); 

    }

    package com.rpc.registry; 

    import com.rpc.protocol.InvokerProtocol; 

    import io.netty.channel.ChannelHandlerContext; 

    import io.netty.channel.ChannelInboundHandlerAdapter; 

    import java.io.File; 

    import java.lang.reflect.Method; 

    import java.net.URL; 

    import java.util.ArrayList; 

    import java.util.List; 

    import java.util.concurrent.ConcurrentHashMap; 

    public class RegistryHandler extends ChannelInboundHandlerAdapter { 

    //保存所有可用的服务(ConcurrentHashMap

    是线程安全且高效的HashMap实现) 

    public static ConcurrentHashMap

    <String, Object> 

    registryMap = new ConcurrentHashMap<String, Object>(); 

    //保存所有相关的服务类 

    private List<String> classNames = new ArrayList<String>(); 

    public RegistryHandler() { 

    //完成递归扫描 

    scannerClass("com.rpc.provider"); 

    doRegister(); 

    }

    @Override 

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 

    Exception { 

    Object result = new Object(); 

    InvokerProtocol request = (InvokerProtocol) msg; 

    //当客户端建立连接时,需要从自定义协议中获取信息,

    拿到具体的服务和实参 

    //使用反射调用 

    if (registryMap.containsKey(request.getClassName())) { 

    Object clazz = registryMap.get(request.getClassName()); 

    Method method = clazz.getClass().getMethod(request.

    getMethodName(), 

    request.getParames()); 

    result = method.invoke(clazz, 

    request.getValues()); 

    }

    ctx.write(result); 

    ctx.flush(); 

    ctx.close(); 

    }

    @Override 

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 

    throws Exception { 

    cause.printStackTrace(); 

    ctx.close(); 

    }

    /*

    * 递归扫描 

    */ 

    private void scannerClass(String packageName)

     { 

    URL url = 

    this.getClass().getClassLoader().getResource

    (packageName.replaceAll("\.", 

    "/")); 

    File dir = new File(url.getFile());

    for (File file : dir.listFiles()) { 

    //如果是一个文件夹,继续递归 

    if (file.isDirectory()) { 

    scannerClass(packageName + "." +

     file.getName()); 

    } else { 

    classNames.add(packageName + "." + 

    file.getName().replace(".class", "").trim()); 

    }

    /**

    * 完成注册 

    */ 

    private void doRegister() { 

    if (classNames.size() == 0) { 

    return; 

    }

    for (String className : classNames) { 

    try {

    Class<?> clazz = Class.forName(className); 

    Class<?> i = clazz.getInterfaces()[0]; 

    registryMap.put(i.getName(),

    clazz.newInstance()); 

    } catch (Exception e) { 

    e.printStackTrace(); 

    }

    4、消费端

    package com.rpc.consumer; 

    import com.rpc.api.IRpcHelloService; 

    import com.rpc.api.IRpcService; 

    public class RpcConsumer { 

    public static void main(String [] args){ 

    IRpcHelloService rpcHello = RpcProxy.create(IRpcHelloService.class); 

    System.out.println(rpcHello.hello("Netty")); 

    IRpcService service = RpcProxy.create(IRpcService.class); 

    System.out.println(service.deleteUser(4)); 

    System.out.println(service.updateUser(3)); 

    System.out.println(service.queryUser(2)); 

    System.out.println(service.addUser()); 

    }

    package com.rpc.consumer; 

    import com.rpc.protocol.InvokerProtocol; 

    import io.netty.bootstrap.Bootstrap; 

    import io.netty.channel.*; 

    import io.netty.channel.nio.NioEventLoopGroup; 

    import io.netty.channel.socket.SocketChannel;

    import io.netty.channel.socket.nio.NioSocketChannel; 

    import io.netty.handler.codec.

    LengthFieldBasedFrameDecoder; 

    import io.netty.handler.codec.LengthFieldPrepender; 

    import io.netty.handler.codec.serialization.ClassResolvers; 

    import io.netty.handler.codec.serialization.ObjectDecoder; 

    import io.netty.handler.codec.serialization.ObjectEncoder; 

    import java.lang.reflect.InvocationHandler; 

    import java.lang.reflect.Method; 

    import java.lang.reflect.Proxy; 

    public class RpcProxy { 

    public static <T> T create(Class<?> clazz) { 

    //clazz传进来本身就是interface 

    MethodProxy proxy = new MethodProxy(clazz); 

    Class<?>[] interfaces = clazz.isInterface() ? 

    new Class[]{clazz} : 

    clazz.getInterfaces(); 

    T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(), 

    interfaces, proxy); 

    return result; 

    }

    private static class MethodProxy implements InvocationHandler { 

    private Class<?> clazz; 

    public MethodProxy(Class<?> clazz) { 

    this.clazz = clazz; 

    }

    public Object invoke(Object proxy, Method 

    method, Object[] args) throws 

    Throwable { 

    //如果传进来是一个已实现的具体类 

    if (Object.class.equals(method.

    getDeclaringClass())) { 

    try {

    return method.invoke(this, args); 

    } catch (Throwable t) { 

    t.printStackTrace(); 

    }

    //如果传进来的是一个接口(核心) 

    } else { 

    return rpcInvoke(proxy, method, args); 

    }

    return null; 

    }

    /**

    * 实现接口的核心方法 

    *

    * @param method 

    * @param args 

    * @return 

    */ 

    public Object rpcInvoke(Object proxy, Method method, Object[] args) { 

    //传输协议封装 

    InvokerProtocol msg = new InvokerProtocol(); 

    msg.setClassName(this.clazz.getName()); 

    msg.setMethodName(method.getName()); 

    msg.setValues(args); 

    msg.setParames(method.getParameterTypes());

    final RpcProxyHandler consumerHandler = new RpcProxyHandler(); 

    EventLoopGroup group = new NioEventLoopGroup(); 

    try {

    Bootstrap b = new Bootstrap(); 

    b.group(group) 

    .channel(NioSocketChannel.class) 

    .option(ChannelOption.TCP_NODELAY, true) 

    .handler(new ChannelInitializer<

    SocketChannel>() { 

    @Override 

    public void initChannel(SocketChannel ch)

     throws 

    Exception { 

    ChannelPipeline pipeline = ch.pipeline(); 

    //自定义协议解码器 

    /** 入参有5个,分别解释如下 

    maxFrameLength:框架的最大长度。

    如果帧的长度大于此 值,则将抛出TooLongFrameException。

    lengthFieldOffset:长度字段的偏移量:

    即对应的长度字 段在整个消息数据中得位置 

    lengthFieldLength:长度字段的长度:

    如:长度字段是int 型表示,那么这个值就是

    4(long型就是8)

    lengthAdjustment:要添加到长度字段值的补偿值 

    initialBytesToStrip:从解码帧中去除的第一个字节数 

    */ 

    pipeline.addLast("frameDecoder", new 

    LengthFieldBasedFrameDecoder

    (Integer.MAX_VALUE, 0, 4, 0, 4)); 

    //自定义协议编码器 

    pipeline.addLast("frameEncoder", new 

    LengthFieldPrepender(4)); 

    //对象参数类型编码器 

    pipeline.addLast("encoder", new 

    ObjectEncoder()); 

    //对象参数类型解码器 

    pipeline.addLast("decoder", new 

    ObjectDecoder( 

    Integer.MAX_VALUE, 

    ClassResolvers.cacheDisabled(null))); 

    pipeline.addLast("handler", consumerHandler); 

    }); 

    ChannelFuture future = b.connect("localhost", 8888).sync(); 

    future.channel().writeAndFlush(msg).sync(); 

    future.channel().closeFuture().sync(); 

    } catch (Exception e) { 

    e.printStackTrace(); 

    } finally { 

    group.shutdownGracefully(); 

    }

    return consumerHandler.getResponse(); 

    }

    package com.rpc.consumer; 

    import io.netty.channel.ChannelHandlerContext; 

    import io.netty.channe

    l.

    ChannelInboundHandlerAdapter;public class RpcProxyHandler extends ChannelInboundHandlerAdapter { 

    private Object response; 

    public Object getResponse() { 

    return response; 

    }

    @Override 

    public void channelRead

    (ChannelHandlerContext ctx, Object msg) throws 

    Exception { 

    response = msg; 

    }

    @Override 

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 

    throws Exception { 

    System.out.println("client exception is general"); 

    }

    文章365天持续更新,可以微信搜索「猿灯塔」第一时间阅读,回复【资料】【面试】【简历】有我准备的一线大厂面试资料和简历模板.

  • 相关阅读:
    [DB] 数据库的连接
    JS leetcode 翻转字符串里的单词 题解分析
    JS leetcode 拥有最多糖果的孩子 题解分析,六一快乐。
    JS leetcode 搜索插入位置 题解分析
    JS leetcode 杨辉三角Ⅱ 题解分析
    JS leetcode 寻找数组的中心索引 题解分析
    JS leetcode 移除元素 题解分析
    JS leetcode 最大连续1的个数 题解分析
    JS leetcode 两数之和 II
    JS leetcode 反转字符串 题解分析
  • 原文地址:https://www.cnblogs.com/yuandengta/p/12604837.html
Copyright © 2011-2022 走看看