zoukankan      html  css  js  c++  java
  • [源码阅读] 阿里SOFA服务注册中心MetaServer(3)

    [源码阅读] 阿里SOFA服务注册中心MetaServer(3)

    0x00 摘要

    SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。本系列将带领大家一起分析其MetaServer的实现机制。本文为第三篇,介绍MetaServer如何基于raft实现了数据一致性。

    因为篇幅限制,本文不会涉及 Raft 和 JRaft 的原理和实现,只是讲解MetaServer如何基于 JRaft 的实现。

    0x01 概念

    1.1 分布式一致性

    分布式一致性 (distributed consensus) 是分布式系统中最基本的问题,用来保证一个分布式系统的可靠性以及容灾能力。

    简单的来讲,就是如何在多个机器间对某一个值达成一致,并且当达成一致之后,无论之后这些机器间发生怎样的故障,这个值能保持不变。抽象定义上, 一个分布式系统里的所有进程要确定一个值 v,如果这个系统满足如下几个性质,就可以认为它解决了分布式一致性问题,几个性质分别是 :

    • Termination: 所有正常进程都会决定 v 具体的值,不会存在一直在循环的进程。

    • Validity: 任何正常进程如果有一个确定的值 v’,那么 v’ 肯定是某个进程提交的。比如随机数生成器就不满足这个性质。

    • Agreement: 所有正常进程选择的值都是一样的。

    1.2 SOFAJRaft

    SOFAJRaft 是一个基于 Raft 一致性算法的生产级高性能 Java 实现,支持 MULTI-RAFT-GROUP,适用于高负载低延迟的场景。

    因为 SOFARegistry 集群节点列表数据并不是很多,因此不需要使用数据分片的方式在 MetaServer 中存储。集群节点列表存储在 Repository 中,通过 Raft 强一致性协议对外提供节点注册、续约、列表查询等 Bolt 请求,从而保障集群获得的数据是强一致性的。

    0x02 基础架构

    这里的基础架构指的是 :基于JRaft之上的,在SOFARegistry之中构建的基础架构,包括StateMachine,Handler,RaftServer,RaftClient 等等。

    2.1 RaftExchanger

    Exchange 作为 Client / Server 连接的抽象,负责节点之间的连接。RaftExchanger就是Raft协议连接的抽象,可以看到其中包含配置,Registry和Raft组件。

    public class RaftExchanger {
        @Autowired
        private MetaServerConfig    metaServerConfig;
        @Autowired
        private NodeConfig          nodeConfig;
        @Autowired
        private Registry            metaServerRegistry;
        private RaftServer          raftServer;
        private RaftClient          raftClient;
        private CliService          cliService;
    }
    

    在系统启动时候,会把Raft这几个组件启动。

    private void initRaft() {
        raftExchanger.startRaftServer(executorManager);
        raftExchanger.startRaftClient();
        raftExchanger.startCliService();
    }
    

    2.2 RaftServer

    RaftServer是Raft协议的服务端,主要有如下成员或者行为:

    • 启动一个 raft node节点,提供分布式服务。
    • 内部使用 jraft 提供的 RaftGroupService 服务框架。
    • fsm是业务状态机,其实现类是ServiceStateMachine,其行为handler针对Leader和follower分别有leaderProcessListener 和 followerProcessListener。
    • boltServer 是Blot服务。因为JRraft基于bolt,所以设置了RaftServerHandlerRaftServerConnectionHandler

    具体类实现如下:

    public class RaftServer { 
        private RaftGroupService        raftGroupService; // jraft 服务端服务框架
        private Node                    node; // raft 节点
        private ServiceStateMachine     fsm; // 业务状态机
        private PeerId                  serverId;
        private Configuration           initConf;
        private String                  groupId;
        private String                  dataPath;
        private List<ChannelHandler>    serverHandlers = new ArrayList<>();
        private LeaderProcessListener   leaderProcessListener;
        private FollowerProcessListener followerProcessListener;
        private BoltServer              boltServer;	
        
        public void start(RaftServerConfig raftServerConfig) throws IOException {
    
            FileUtils.forceMkdir(new File(dataPath));
    
            // 构建服务端,设置handler
            serverHandlers.add(new RaftServerHandler(this));
            serverHandlers.add(new RaftServerConnectionHandler());
            boltServer = new BoltServer(new URL(NetUtil.getLocalAddress().getHostAddress(),
                serverId.getPort()), serverHandlers);
    
            // 启动服务端
            boltServer.initServer();
    
            RpcServer rpcServer = boltServer.getRpcServer();
    
            RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);
    
            // 设置状态机的handler
            this.fsm = ServiceStateMachine.getInstance();
            this.fsm.setLeaderProcessListener(leaderProcessListener);
            this.fsm.setFollowerProcessListener(followerProcessListener);
    
            NodeOptions nodeOptions = initNodeOptions(raftServerConfig);
    
            this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer);
            //start
            this.node = this.raftGroupService.start();
    
            // 启动客户端
            RpcClient raftClient = ((AbstractBoltClientService) (((NodeImpl) node).getRpcService()))
                .getRpcClient();
    
            NotifyLeaderChangeHandler notifyLeaderChangeHandler = new NotifyLeaderChangeHandler(
                groupId, null);
            raftClient.registerUserProcessor(new SyncUserProcessorAdapter(notifyLeaderChangeHandler));
        }
        
    }	
    

    2.2.1 RaftServerHandler

    RaftServerHandler是服务端相应handler,首先接受Bolt消息,然后转化成 processRequest,发送给node。

    received:84, RaftServerHandler (com.alipay.sofa.registry.jraft.handler)
    handleRequest:55, AsyncUserProcessorAdapter (com.alipay.sofa.registry.remoting.bolt)
    dispatchToUserProcessor:224, RpcRequestProcessor (com.alipay.remoting.rpc.protocol)
    doProcess:145, RpcRequestProcessor (com.alipay.remoting.rpc.protocol)
    run:366, RpcRequestProcessor$ProcessTask (com.alipay.remoting.rpc.protocol)
    runWorker:1149, ThreadPoolExecutor (java.util.concurrent)
    run:624, ThreadPoolExecutor$Worker (java.util.concurrent)
    run:748, Thread (java.lang)
    

    RaftServerHandler 会根据本身是Leader还是Follower做不同处理。

    然后会在createTask之中进行 Hessian协议 处理,进而调用 raftServer.getNode().apply(task);

    大致逻辑如下:

    • 从消息中解析出请求;
    • 根据请求解析出对应的处理函数;
    • 如果就是简单读取,就直接处理,然后返回;
    • 如果需要task处理,就生成closure;
      • 生成处理closure的task;
      • 执行task;

    具体代码如下:

    public class RaftServerHandler implements ChannelHandler {
        protected RaftServer        raftServer;
        
        @Override
        public void received(Channel channel, Object message) throws RemotingException {
    
            BoltChannel boltChannel = (BoltChannel) channel;
            AsyncContext asyncContext = boltChannel.getAsyncContext();
    
            if (!raftServer.getFsm().isLeader()) {
                asyncContext.sendResponse(ProcessResponse.redirect(raftServer.redirect()).build());
                return;
            }
            
            // 从消息中解析出请求
            ProcessRequest processRequest = (ProcessRequest) message;
            long start = System.currentTimeMillis();
            // 根据请求解析出对应的处理函数
            Method method = Processor.getInstance().getWorkMethod(processRequest);
    
            if (Processor.getInstance().isLeaderReadMethod(method)) {
                // 如果就是简单读取,就直接处理,然后返回
                Object obj = Processor.getInstance().process(method, processRequest);
                long cost = System.currentTimeMillis() - start;
                asyncContext.sendResponse(obj);
            } else {
                // 如果需要task处理,就生成closure
                LeaderTaskClosure closure = new LeaderTaskClosure();
                closure.setRequest(processRequest);
                closure.setDone(status -> {
                    long cost = System.currentTimeMillis() - start;
                    if (status.isOk()) {
                        asyncContext.sendResponse(closure.getResponse());
                    } else {
                        asyncContext.sendResponse(ProcessResponse.fail(status.getErrorMsg()).build());
                    }
                });
    
                // 生成处理closure的task
                Task task = createTask(closure, processRequest);
                // 执行task
                raftServer.getNode().apply(task);
            }
        }  
    }
    

    2.2.2 ServiceStateMachine

    ServiceStateMachine 是服务端的状态机,MetaServer这里主要是实现核心的 onApply(iterator) 方法,应用用户提交的请求到Processor处理。

    关于快照的部分我们会在后续讲解。

    public class ServiceStateMachine extends StateMachineAdapter {;
    
        private LeaderProcessListener               leaderProcessListener;
        private FollowerProcessListener             followerProcessListener;
        private static volatile ServiceStateMachine instance;
      
        @Override
        public void onApply(Iterator iter) {
            while (iter.hasNext()) {
                Closure done = iter.done();
                ByteBuffer data = iter.getData();
                ProcessRequest request;
                LeaderTaskClosure closure = null;
    
                if (done != null) {
                    closure = (LeaderTaskClosure) done;
                    request = closure.getRequest();
                } else {
                    Hessian2Input input = new Hessian2Input(new ByteArrayInputStream(data.array()));
                    SerializerFactory serializerFactory = new SerializerFactory();
                    input.setSerializerFactory(serializerFactory);
                    request = (ProcessRequest) input.readObject();
                    input.close();
                }
    
                ProcessResponse response = Processor.getInstance().process(request);
                if (closure != null) {
                    closure.setResponse(response);
                    closure.run(Status.OK());
                }
                iter.next();
            }
        }
    }                                                              
    

    2.3 RaftClient

    客户端 Client 比较简单,主要使用 jraft 提供的 RouteTable 来刷新获取最新的 leader 节点,然后发送请求到 leader节点。

    public class RaftClient {
        private BoltCliClientService cliClientService;
        private RpcClient            rpcClient;
        private CliOptions           cliOptions;
        private String               groupId;
        private Configuration        conf;
    }
    

    0x03 相关配置

    JRaft相关配置主要是在 MetaServerRepositoryConfiguration 之中完成的。

    因为各种节点列表是存储在Repository之中,而Repository是由JRaft来保证数据一致性,所以配置中主要是和Repository相关,比如三个RepositoryService。

    • dataRepositoryService
    • metaRepositoryService
    • sessionRepositoryService

    其次是Session版本服务和两个Confirm服务

    • SessionVersionRepositoryService
    • DataConfirmStatusService
    • SessionConfirmStatusService

    然后是RaftExchanger,这是一个网络交互的抽象。

    最后是RaftAnnotationBeanPostProcessor,这是用来在运行时候处理Bean。

    代码如下:

    @Configuration
    public static class MetaServerRepositoryConfiguration {
        @Bean
        public RepositoryService dataRepositoryService() {
            return new DataRepositoryService();
        }
    
        @Bean
        public RepositoryService metaRepositoryService() {
            return new MetaRepositoryService();
        }
    
        @Bean
        public RepositoryService sessionRepositoryService() {
            return new SessionRepositoryService();
        }
      
        @Bean
        public VersionRepositoryService sessionVersionRepositoryService() {
            return new SessionVersionRepositoryService();
        }
      
        @Bean
        public NodeConfirmStatusService dataConfirmStatusService() {
            return new DataConfirmStatusService();
        }
    
        @Bean
        public NodeConfirmStatusService sessionConfirmStatusService() {
            return new SessionConfirmStatusService();
        }
    
        @Bean
        public RaftExchanger raftExchanger() {
            return new RaftExchanger();
        }
    
        @Bean
        public RaftAnnotationBeanPostProcessor raftAnnotationBeanPostProcessor() {
            return new RaftAnnotationBeanPostProcessor();
        }
    }
    

    另外,MetaDBConfiguration 也实现了一个Bean。

    @Configuration
    public static class MetaDBConfiguration {
        @Bean
        public DBService persistenceDataDBService() {
            return new PersistenceDataDBService();
        }
    }
    

    3.1 RepositoryService接口

    因为Raft主要作用于RepositoryService接口,所以首先讲解RepositoryService接口。

    针对Repository所有的操作都是直接调用的 RepositoryService 等接口,DataRepositoryService 等类实现了这个接口。

    @RaftService(uniqueId = "dataServer")
    public class DataRepositoryService extends AbstractSnapshotProcess
                                       implements
                                       RepositoryService<String, RenewDecorate<DataNode>> {
    }
    

    比如 DataStoreService 就会直接调用 dataRepositoryService进行各种操作。

    public class DataStoreService implements StoreService<DataNode> {
        @RaftReference(uniqueId = "dataServer")
        private RepositoryService<String, RenewDecorate<DataNode>> dataRepositoryService;
      
        ......
        dataRepositoryService.replaceAll(dataCenter, dataCenterNodesMapTemp, version);
        ......
    }
    

    3.2 RaftReference & RaftService

    这两个注解可以认为是封装好Raft的从而呈现给Registry的接口。RaftReference 对应了客户端代理,RaftService对应着服务端的实现

    为什么要这么做?因为需要维护数据一致性,所以必须把单纯的本地调用转换为异步网络调用,这样才能用raft协议保证数据一致性

    3.2.1 注解定义

    RaftService定义如下:

    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.TYPE)
    public @interface RaftService {
        Class<?> interfaceType() default void.class;
        String uniqueId() default "";
    }
    

    RaftReference定义如下:

    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.FIELD)
    public @interface RaftReference {
        Class<?> interfaceType() default void.class;
        String uniqueId() default "";
    }
    

    3.2.2 注解使用

    凡是需要由Raft控制的服务都加上了RaftService这个注解。

    • dataRepositoryService
    • metaRepositoryService
    • sessionRepositoryService
    • SessionVersionRepositoryService
    • DataConfirmStatusService
    • SessionConfirmStatusService
    • PersistenceDataDBService

    凡是 RaftService具体相关实现类都加了 @RaftReference 注解,因为根据id进行区分,所以有些服务设定了uniqueId。

        @RaftReference
        private DBService           persistenceDataDBService;
    
        @RaftReference(uniqueId = "dataServer")
        private RepositoryService<String, RenewDecorate<DataNode>> dataRepositoryService;
        @RaftReference(uniqueId = "dataServer")
        private NodeConfirmStatusService<DataNode>                 dataConfirmStatusService;
    
        @RaftReference(uniqueId = "metaServer")
        private RepositoryService<String, RenewDecorate<MetaNode>> metaRepositoryService;
    
    	  @RaftReference(uniqueId = "sessionServer")
        private RepositoryService<String, RenewDecorate<SessionNode>> sessionRepositoryService;
        @RaftReference(uniqueId = "sessionServer")
        private VersionRepositoryService<String>                      sessionVersionRepositoryService;
        @RaftReference(uniqueId = "sessionServer")
        private NodeConfirmStatusService<SessionNode>                 sessionConfirmStatusService;
    

    3.2.3 注解实现

    RaftAnnotationBeanPostProcessor 是 BeanPostProcessor 的实现,在这里就对 RaftReference & RaftService 这两个注解进行了处理。

    BeanPostProcessor接口作用如下:如果我们想在Spring容器中完成bean实例化、配置以及其他初始化方法前后要添加一些自己逻辑处理。我们需要定义一个或多个BeanPostProcessor接口实现类,然后注册到Spring IoC容器中。

    public class RaftAnnotationBeanPostProcessor implements BeanPostProcessor, Ordered {
        @Autowired
        private RaftExchanger       raftExchanger;
        @Override
        public Object postProcessBeforeInitialization(Object bean, String beanName) {
            processRaftReference(bean);
            return bean;
        }
        @Override
        public Object postProcessAfterInitialization(Object bean, String beanName) {
            processRaftService(bean, beanName);
            return bean;
        }  
    }
    

    对于两个注解,有不同的处理方式。

    3.2.3.1 客户端processRaftReference

    针对processRaftReference的处理就是:把加了 @RaftReference 注解的属性替换成动态代理,进而替换成客户端调用。即在 processRaftReference 方法中,凡是加了 @RaftReference 注解的属性,都会被动态代理类替换,其代理实现见 ProxyHandler 类,即将方法调用,封装为 ProcessRequest,通过 RaftClient 发送给 RaftServer

    private void processRaftReference(Object bean) {
        final Class<?> beanClass = bean.getClass();
    
        ReflectionUtils.doWithFields(beanClass, field -> {
            RaftReference referenceAnnotation = field.getAnnotation(RaftReference.class);
    
            Class<?> interfaceType = referenceAnnotation.interfaceType();
            String serviceId = getServiceId(interfaceType, referenceAnnotation.uniqueId());
            Object proxy = getProxy(interfaceType, serviceId);
            ReflectionUtils.makeAccessible(field);
            ReflectionUtils.setField(field, bean, proxy); // 设置代理
    
        }, field -> !Modifier.isStatic(field.getModifiers())
                && field.isAnnotationPresent(RaftReference.class));
    }
    
    private Object getProxy(Class<?> interfaceType, String serviceId) {
            RaftClient client = raftExchanger.getRaftClient();
            return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class<?>[] { interfaceType }, new ProxyHandler(interfaceType, serviceId,
                    raftExchanger.getRaftClient()));
    }
    
    field = {Field@3824} "private com.alipay.sofa.registry.store.api.DBService com.alipay.sofa.registry.server.meta.remoting.handler.FetchProvideDataRequestHandler.persistenceDataDBService"
    referenceAnnotation = {$Proxy42@3825} "@com.alipay.sofa.registry.store.api.annotation.RaftReference(interfaceType=void, uniqueId=)"
    interfaceType = {Class@3150} "interface com.alipay.sofa.registry.store.api.DBService"
    serviceId = "com.alipay.sofa.registry.store.api.DBService"
    

    假设 DataStoreService ,在注解处理之前是:

    bean = {DataStoreService@4053} 
     dataRepositoryService = null 
    

    +-----------------------------+
    |     DataStoreService        |
    |                             |
    |  +-----------------------+  |
    |  | dataRepositoryService +---------> Null
    |  +-----------------------+  |
    +-----------------------------+
    
    

    注解处理之后是

    bean = {DataStoreService@4053} 
     dataRepositoryService = {$Proxy46@4057} Method threw 'java.lang.RuntimeException' exception. Cannot evaluate com.sun.proxy.$Proxy46.toString()
       
       
    proxy = {$Proxy46@4057} Method threw 'java.lang.RuntimeException' exception. Cannot evaluate com.sun.proxy.$Proxy46.toString()
     h = {ProxyHandler@4098} 
      interfaceType = {Class@4018} "interface com.alipay.sofa.registry.server.meta.repository.RepositoryService"
      serviceId = "com.alipay.sofa.registry.server.meta.repository.RepositoryService:dataServer"
      client = {RaftClient@4077} 
       cliClientService = {BoltCliClientService@4144} 
       rpcClient = {RpcClient@4145} 
       cliOptions = {CliOptions@4146} "RpcOptions{rpcConnectTimeoutMs=1000, rpcDefaultTimeout=5000, rpcInstallSnapshotTimeout=300000, rpcProcessorThreadPoolSize=80, enableRpcChecksum=false, metricRegistry=null}"
       groupId = "RegistryGroup_DefaultDataCenter"
       conf = {Configuration@4148} "192.168.1.2:9614"
       started = {AtomicBoolean@4149} "true"   
    

    即如下图

    +-----------------------------+
    |     DataStoreService        |                  +-------------------+
    |                             |                  |   ProxyHandler    |
    |  +-----------------------+  |       +-----+    | +---------------+ |
    |  | dataRepositoryService +--------->+Proxy+--->+ | interfaceType | |
    |  +-----------------------+  |       +-----+    | |               | |
    +-----------------------------+                  | | serviceId     | |
                                                     | |               | |
                                                     | | RpcClient     | |
                                                     | +---------------+ |
                                                     +-------------------+
    
    

    这样就被动态转移到了ProxyHandler,这样如果调用成员函数,就会通过rpc进行调用。

    3.2.3.2 服务端processRaftService

    针对processRaftService的处理就是,把加了 @RaftService 的类映射成 Processor 类,进而实现为SOFAJRaft 的状态机 ServiceStateMachine即被加了 @RaftService 的类会被添加到 Processor 类 中,通过 serviceId(interfaceName + uniqueId) 进行区分。

    RaftServer 收到请求后,会把它生效到 SOFAJRaft 的状态机,具体实现类为 ServiceStateMachine,即会调用 Processor 方法,通过 serviceId 找到这个实现类,执行对应的方法调用。

    private void processRaftService(Object bean, String beanName) {
        final Class<?> beanClass = AopProxyUtils.ultimateTargetClass(bean);
        RaftService raftServiceAnnotation = beanClass.getAnnotation(RaftService.class);
        Class<?> interfaceType = raftServiceAnnotation.interfaceType();
    
        String serviceUniqueId = getServiceId(interfaceType, raftServiceAnnotation.uniqueId());
        Processor.getInstance().addWorker(serviceUniqueId, interfaceType, bean);
    }
    
    其中部分变量如下:
    bean = {DataRepositoryService@3805} 
    beanName = "dataRepositoryService"
    beanClass = {Class@3796} "class com.alipay.sofa.registry.server.meta.repository.service.DataRepositoryService"
    raftServiceAnnotation = {$Proxy41@3807} "@com.alipay.sofa.registry.store.api.annotation.RaftService(interfaceType=void, uniqueId=dataServer)"
    interfaceType = {Class@3795} "interface com.alipay.sofa.registry.server.meta.repository.RepositoryService"
    serviceUniqueId = "com.alipay.sofa.registry.server.meta.repository.RepositoryService:dataServer"
    

    在处理注解时候,通过 addWorker 来把类和成员变量设置到map中。注意 workerMethods 是一个双层HashMap,第一层是以服务名为key,value是一个HashMap,第二层是以函数名为key,具体函数为value。

    public void addWorker(String serviceId, Class interfaceClazz, Object target) {
        Map<String, Method> publicMethods = new HashMap();
        for (Method m : interfaceClazz.getMethods()) {
            StringBuilder mSigs = new StringBuilder();
            mSigs.append(m.getName());
            for (Class<?> paramType : m.getParameterTypes()) {
                mSigs.append(paramType.getName());
            }
            publicMethods.put(mSigs.toString(), m);
        }
    
        workerMethods.put(serviceId, publicMethods);
        workers.put(serviceId, target);
    }
    
    serviceId = "com.alipay.sofa.registry.store.api.DBService"
    interfaceClazz = {Class@3118} "interface com.alipay.sofa.registry.store.api.DBService"
    target = {PersistenceDataDBService@3124} 
    
    this = {Processor@3812} 
     workerMethods = {HashMap@3815}  size = 2
      "com.alipay.sofa.registry.server.meta.repository.RepositoryService:dataServer" -> {HashMap@3813}  size = 13
       key = "com.alipay.sofa.registry.server.meta.repository.RepositoryService:dataServer"
       value = {HashMap@3813}  size = 13
        "getNodeRepositories" -> {Method@3856} "public abstract java.util.Map com.alipay.sofa.registry.server.meta.repository.RepositoryService.getNodeRepositories()"
        "replaceAlljava.lang.Stringjava.util.Mapjava.lang.Long" -> {Method@3858} "public abstract java.util.Map com.alipay.sofa.registry.server.meta.repository.RepositoryService.replaceAll(java.lang.String,java.util.Map,java.lang.Long)"
        "checkVersionjava.lang.Objectjava.lang.Long" -> {Method@3860} "public abstract boolean com.alipay.sofa.registry.server.meta.repository.RepositoryService.checkVersion(java.lang.Object,java.lang.Long)"
        "replacejava.lang.Objectjava.lang.Object" -> {Method@3862} "public default java.lang.Object com.alipay.sofa.registry.server.meta.repository.RepositoryService.replace(java.lang.Object,java.lang.Object)"
        "removejava.lang.Object" -> {Method@3864} "public default java.lang.Object com.alipay.sofa.registry.server.meta.repository.RepositoryService.remove(java.lang.Object)"
        "putjava.lang.Objectjava.lang.Objectjava.lang.Long" -> {Method@3866} "public abstract java.lang.Object com.alipay.sofa.registry.server.meta.repository.RepositoryService.put(java.lang.Object,java.lang.Object,java.lang.Long)"
        "getVersionjava.lang.Object" -> {Method@3868} "public abstract java.lang.Long com.alipay.sofa.registry.server.meta.repository.RepositoryService.getVersion(java.lang.Object)"
        "putjava.lang.Objectjava.lang.Object" -> {Method@3870} "public default java.lang.Object com.alipay.sofa.registry.server.meta.repository.RepositoryService.put(java.lang.Object,java.lang.Object)"
        "getAllData" -> {Method@3872} "public abstract java.util.Map com.alipay.sofa.registry.server.meta.repository.RepositoryService.getAllData()"
        "removejava.lang.Objectjava.lang.Long" -> {Method@3874} "public abstract java.lang.Object com.alipay.sofa.registry.server.meta.repository.RepositoryService.remove(java.lang.Object,java.lang.Long)"
        "getjava.lang.Object" -> {Method@3876} "public abstract java.lang.Object com.alipay.sofa.registry.server.meta.repository.RepositoryService.get(java.lang.Object)"
        "getAllDataMap" -> {Method@3878} "public abstract java.util.Map com.alipay.sofa.registry.server.meta.repository.RepositoryService.getAllDataMap()"
        "replacejava.lang.Objectjava.lang.Objectjava.lang.Long" -> {Method@3880} "public abstract java.lang.Object com.alipay.sofa.registry.server.meta.repository.RepositoryService.replace(java.lang.Object,java.lang.Object,java.lang.Long)"
      "com.alipay.sofa.registry.store.api.DBService" -> {HashMap@3828}  size = 5
       
     workers = {HashMap@3814}  size = 2
      "com.alipay.sofa.registry.server.meta.repository.RepositoryService:dataServer" -> {DataRepositoryService@3805} 
       key = "com.alipay.sofa.registry.server.meta.repository.RepositoryService:dataServer"
       value = {DataRepositoryService@3805} 
      "com.alipay.sofa.registry.store.api.DBService" -> {PersistenceDataDBService@3117} 
       key = "com.alipay.sofa.registry.store.api.DBService"
       value = {PersistenceDataDBService@3117} 
     methodHandleMap = {ConcurrentHashMap@3817}  size = 0
    

    即如下图所示:

                                            +-------------------+
                                            |      Processor    |        +> DataRepositoryService              +-> getNodeRepositories +-> getNodeRepositories()
                                            | +---------------+ |        |                                     |
                                            | |  workers+--------------->+> PersistenceDataDBService           +-> replaceAll+--> replaceAll
                                            | |               | |                                              |
    +-----------------------+  addWorker    | |               | |                      {HashMap}               +-> checkVersion +--> ...
    | DataRepositoryService +-------------->+ | {HashMap}     | |       +---> +----------------------------+   |
    +-----------------------+               | | workerMethods +-------->+     |RepositoryService:dataServer+-->-->-replace +----->  ...
                                            | +---------------+ |       |     +----------------------------+   |
                                            +-------------------+       |                                      +-> remove  +---->  ...
                                                                        |                                      |
                                                                        +--->  ......                          +-> put  +-----> ...
                                                                        |                                      |
                                                                        |                                      +-> getAllDataMap +----> ...
                                                                        |                                      |
                                                                        |       {HashMap}       +--->   ...    +-> ......
                                                                        |     +---------+       |
                                                                        +---> |DBService+----------->   ...
                                                                              +---------+       |
                                                                                                |
                                                                                                +--->   ...
    
    

    手机上参见如下:

    其调用栈如下:

    addWorker:69, Processor (com.alipay.sofa.registry.jraft.processor)
    processRaftService:123, RaftAnnotationBeanPostProcessor (com.alipay.sofa.registry.server.meta.repository.annotation)
    postProcessAfterInitialization:60, RaftAnnotationBeanPostProcessor (com.alipay.sofa.registry.server.meta.repository.annotation)
    applyBeanPostProcessorsAfterInitialization:421, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
    initializeBean:1635, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
    doCreateBean:553, AbstractAutowireCapableBeanFactory 
    

    以上过程其实和 RPC 调用非常类似,在引用方发起的方法调用,并不会真正的执行方法,而是封装成请求发送到 Raft 服务,由 Raft 状态机进行真正的方法调用,比如把节点信息存储到 Map 中。所有节点之间的数据一致由Raft协议进行保证。当然如果本机就是主节点, 对于一些查询请求不需要走Raft协议而直接调用本地实现方法。

    0x04 网络交互

    当Registry需要进行业务调用时候,就会隐形使用Raft。

    比如 DataStoreService 会进行如下调用:

    Map<String/*dataCenter*/, NodeRepository> dataNodeRepositoryMap = dataRepositoryService
            .getNodeRepositories();
    

    getNodeRepositories会使用 Proxy 调用到 ProxyHandler # invoke。

    public class ProxyHandler implements InvocationHandler {
        private final Class<?>      interfaceType;
        private final String        serviceId;
        private final RaftClient    client;
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) {
            try {
                ProcessRequest request = new ProcessRequest();
                request.setMethodArgSigs(createParamSignature(method.getParameterTypes()));
                request.setMethodName(method.getName());
                request.setMethodArgs(args);
                request.setServiceName(serviceId);
    
                if (Processor.getInstance().isLeaderReadMethod(method)) {
                    return doInvokeMethod(request); // 如果本身就是leader,则直接调用JVM函数 
                }
                return client.sendRequest(request); // 否则发起client调用
            }
        }
    }
    

    其调用栈如下:

    invoke:69, ProxyHandler (com.alipay.sofa.registry.jraft.processor)
    getNodeRepositories:-1, $Proxy46 (com.sun.proxy)
    getNodeChangeResult:238, DataStoreService (com.alipay.sofa.registry.server.meta.store)
    getAllNodes:96, MetaServerRegistry (com.alipay.sofa.registry.server.meta.registry)
    getRegisterNodeByType:81, MetaDigestResource (com.alipay.sofa.registry.server.meta.resource)
    lambda$init$1:70, MetaDigestResource (com.alipay.sofa.registry.server.meta.resource)
    

    然后在 RaftClient # sendRequest 中有对 Raft 的进一步调用

    public Object sendRequest(ProcessRequest request) {
        try {
            PeerId peer = getLeader();
            Object response = this.rpcClient.invokeSync(peer.getEndpoint().toString(), request,
                cliOptions.getRpcDefaultTimeout());
            ProcessResponse cmd = (ProcessResponse) response;
            if (cmd.getSuccess()) {
                return cmd.getEntity();
            } 
        } 
    }
    

    当在服务端,调用栈如下

    process:123, Processor (com.alipay.sofa.registry.jraft.processor)
    onApply:133, ServiceStateMachine (com.alipay.sofa.registry.jraft.bootstrap)
    doApplyTasks:534, FSMCallerImpl (com.alipay.sofa.jraft.core)
    doCommitted:503, FSMCallerImpl (com.alipay.sofa.jraft.core)
    runApplyTask:431, FSMCallerImpl (com.alipay.sofa.jraft.core)
    access$100:72, FSMCallerImpl (com.alipay.sofa.jraft.core)
    onEvent:147, FSMCallerImpl$ApplyTaskHandler (com.alipay.sofa.jraft.core)
    onEvent:141, FSMCallerImpl$ApplyTaskHandler (com.alipay.sofa.jraft.core)
    run:137, BatchEventProcessor (com.lmax.disruptor)
    run:748, Thread (java.lang)
    

    最后图例如下:

    +---------------------------------------+        +---------------------------------------------+
    | +------------------------------+      |        |       +----------------------------------+  |
    | | +----------------+  registry |Client|        | Server| +----------------------+registry |  |
    | | |DataStoreService|           |      |        |       | | DataRepositoryService|         |  |
    | | +-----+----------+           |      |        |       | +---------+------------+         |  |
    | |       | getNodeRepositories  |      |        |       |           ^  getNodeRepositories |  |
    | |       |                      |      |        |       |           |                      |  |
    | |       v                      |      |        |       |    +------+----+                 |  |
    | | +-----+-----------------+    |      |        |       |    | Processor |                 |  |
    | | |DataRepositoryService  |    |      |        |       |    +------+----+                 |  |
    | | +-----+-----------------+    |      |        |       |           ^  onApply             |  |
    | |       |                      |      |        |       |           |                      |  |
    | |       v                      |      |        |       |   +-------+------+               |  |
    | |     +-+---+                  |      |        |       |   | StateMachine |               |  |
    | |     |Proxy|                  |      |        |       |   +-------+------+               |  |
    | |     +-+---+                  |      |        |       |           ^  process             |  |
    | |       | invoke               |      |        |       |           |                      |  |
    | |       v                      |      |        |       |           |                      |  |
    | |  +----+-------+              |      |        |       |    +------+------+               |  |
    | |  |ProxyHandler|              |      |        |       |    |FSMCallerImpl|               |  |
    | |  +----+-------+              |      |        |       |    +------+------+               |  |
    | |       | sendRequest          |      |        |       |           ^                      |  |
    | |       v                      |      |        |       |           |  received            |  |
    | |   +---+------+               |      |        |       |           |                      |  |
    | |   |RaftClient|               |      |        |       |   +-----------------+            |  |
    | |   +---+------+               |      |        |       |   |RaftServerHandler|            |  |
    | |       | invokeSync           |      |        |       |   +-----------------+            |  |
    | +------------------------------+      |        |       +----------------------------------+  |
    |         |                             |        |                   |                         |
    |         |                             |        |                   |                         |
    | +------------------------------+      |        |         +--------------------------+        |
    | |       |         remoting.rpc |      |        |         |         |    remoting.rpc|        |
    | |  +----v------+               | bolt | Network|         | +-------+-----------+    |        |
    | |  | RpcClient |               | +---------------------> | |RpcRequestProcessor|    |        |
    | |  +-----------+               |      |        |         | +-------------------+    |        |
    | +------------------------------+      |        |         +--------------------------+        |
    +---------------------------------------+        +---------------------------------------------+
    
    

    在手机上如图

    0x05 快照存储

    首先我们需要看看为什么要有快照机制。

    5.1 存储模块

    SOFAJRaft 存储模块分为:

    • Log 存储记录 Raft 配置变更和用户提交任务日志;
    • Meta 存储即元信息存储记录 Raft 实现的内部状态;
    • Snapshot 存储用于存放用户的状态机 Snapshot 及元信息,Snapshot 是快照,是对数据当前值的一个记录;

    5.2 问题

    当 Raft 节点 Node 重启时,内存中状态机的状态数据丢失,触发启动过程重新存放日志存储 LogStorage 的所有日志重建整个状态机实例,此种场景会导致三个问题:

    • 如果任务提交比较频繁,例如消息中间件场景导致整个重建过程很长启动缓慢;
    • 如果日志非常多并且节点需要存储所有的日志,对存储来说是资源占用不可持续;
    • 如果增加 Node 节点,新节点需要从 Leader 获取所有的日志重新存放至状态机,对于 Leader 和网络带宽都是不小的负担。

    5.3 Snapshot 机制

    因此通过引入 Snapshot 机制来解决此三个问题。

    所谓快照 Snapshot 即对数据当前值的记录,是为当前状态机的最新状态构建”镜像”单独保存,保存成功删除此时刻之前的日志减少日志存储占用;启动的时候直接加载最新的 Snapshot 镜像,然后重放在此之后的日志即可,如果 Snapshot 间隔合理,整个重放到状态机过程较快,加速启动过程。最后新节点的加入先从 Leader 拷贝最新的 Snapshot 安装到本地状态机,然后只要拷贝后续的日志即可,能够快速跟上整个 Raft Group 的进度。

    Leader 生成快照有几个作用:

    • 当有新的节点 Node 加入集群不用只靠日志复制、回放机制和 Leader 保持数据一致,通过安装 Leader 的快照方式跳过早期大量日志的回放;
    • Leader 用快照替代 Log 复制减少网络端的数据量;
    • 用快照替代早期的 Log 节省存储占用空间。

    5.4 ServiceStateMachine

    在状态机层面上来说,使用 snapshot 机制,也就是为状态机做一个 checkpoint,保存当时状态机的状态,删除在此之前的所有日志,核心是实现 StateMachine的两个方法:

    • onSnapshotLoad,启动或者安装 snapshot 后加载 snapshot;
    • onSnapshotSave ,定期保存 snapshot;

    从具体使用 Raft 的相关服务层面来说,每个服务提供了自己不同的业务实现。

    • 在Registry处理注解时候,会通过 addWorker 来把使用 Raft 的相关服务类和成员变量设置到map中;
    • 于是在状态机调用快照相关函数时候,状态机会遍历 Processor.getInstance().getWorkers() ,从而调用每个类的具体处理函数;

    具体work变量如下:

    workers = {HashMap@6176}  size = 7
     "com.alipay.sofa.registry.server.meta.repository.RepositoryService:metaServer" -> {MetaRepositoryService@6201} 
     "com.alipay.sofa.registry.server.meta.repository.RepositoryService:dataServer" -> {DataRepositoryService@6162} 
     "com.alipay.sofa.registry.store.api.DBService" -> {PersistenceDataDBService@6203} 
     "com.alipay.sofa.registry.server.meta.repository.RepositoryService:sessionServer" -> {SessionRepositoryService@6205} 
     "com.alipay.sofa.registry.server.meta.repository.VersionRepositoryService:sessionServer" -> {SessionVersionRepositoryService@6207} 
     "com.alipay.sofa.registry.server.meta.repository.NodeConfirmStatusService:sessionServer" -> {SessionConfirmStatusService@6209} 
     "com.alipay.sofa.registry.server.meta.repository.NodeConfirmStatusService:dataServer" -> {DataConfirmStatusService@6211} 
    

    状态机具体代码如下:

    public class ServiceStateMachine extends StateMachineAdapter {
        @Override
        public void onSnapshotSave(final SnapshotWriter writer, final Closure done) {
    
            Map<String, Object> workers = Processor.getInstance().getWorkers();
            Map<String, SnapshotProcess> snapshotProcessors = new HashMap<>();
            if (workers != null) {
                // 遍历
                workers.forEach((serviceId, worker) -> {
                    if (worker instanceof SnapshotProcess) {
                        SnapshotProcess snapshotProcessor = (SnapshotProcess) worker;
                        snapshotProcessors.put(serviceId, snapshotProcessor.copy());
                    }
                });
            }
            Utils.runInThread(() -> {
                String errors = null;
                outer:
                // 遍历
                for (Map.Entry<String, SnapshotProcess> entry : snapshotProcessors.entrySet()) {
                    String serviceId = entry.getKey();
                    SnapshotProcess snapshotProcessor = entry.getValue();
                    Set<String> fileNames = snapshotProcessor.getSnapshotFileNames();
                    for (String fileName : fileNames) {
                        String savePath = writer.getPath() + File.separator + fileName;
                        boolean ret = snapshotProcessor.save(savePath);   // 调用具体实现
                        if (ret) {
                            if (!writer.addFile(fileName)) {
                                break outer;
                            }
                        } else {
                            break outer;
                        }
                    }
                }
                if (errors != null) {
                    done.run(new Status(RaftError.EIO, errors));
                } else {
                    done.run(Status.OK());
                }
            });
        }
    
        @Override
        public boolean onSnapshotLoad(SnapshotReader reader) {
            List<String> failServices = new ArrayList<>();
            Map<String, Object> workers = Processor.getInstance().getWorkers();
            if (workers != null) {
                // 遍历
                outer: for (Map.Entry<String, Object> entry : workers.entrySet()) {
                    String serviceId = entry.getKey();
                    Object worker = entry.getValue();
                    if (worker instanceof SnapshotProcess) {
                        SnapshotProcess snapshotProcess = (SnapshotProcess) worker;
                        Set<String> fileNames = snapshotProcess.getSnapshotFileNames();
    
                        for (String fileName : fileNames) {
                            if (reader.getFileMeta(fileName) == null) {
                                failServices.add(serviceId);
                                break outer;
                            }
    
                            String savePath = reader.getPath() + File.separator + fileName;
                            boolean ret = snapshotProcess.load(savePath);  // 调用具体实现
                            if (!ret) {
                                failServices.add(serviceId);
                                break outer;
                            }
                        }
                    }
                }
            }
            return true;
        }
    }
    

    5.5 XXXRepositoryService

    关于具体服务,我们可以参见XXXRepositoryService。

    在ServiceStateMachine中,会用 snapshotProcess.load(savePath); 调用具体服务的特殊实现,这从调用栈中可以清晰见到。

    load:317, DataRepositoryService (com.alipay.sofa.registry.server.meta.repository.service)
    onSnapshotLoad:212, ServiceStateMachine (com.alipay.sofa.registry.jraft.bootstrap)
    doSnapshotLoad:641, FSMCallerImpl (com.alipay.sofa.jraft.core)
    runApplyTask:389, FSMCallerImpl (com.alipay.sofa.jraft.core)
    access$100:72, FSMCallerImpl (com.alipay.sofa.jraft.core)
    onEvent:147, FSMCallerImpl$ApplyTaskHandler (com.alipay.sofa.jraft.core)
    onEvent:141, FSMCallerImpl$ApplyTaskHandler (com.alipay.sofa.jraft.core)
    run:137, BatchEventProcessor (com.lmax.disruptor)
    run:748, Thread (java.lang)
    

    以DataRepositoryService为例,其基类AbstractSnapshotProcess做了一些基础实现。

    public abstract class AbstractSnapshotProcess implements SnapshotProcess {
        public boolean save(String path, Object values) {
                FileUtils.writeByteArrayToFile(new File(path), CommandCodec.encodeCommand(values),
                    false);
                return true;
        }
        public <T> T load(String path, Class<T> clazz) throws IOException {
            byte[] bs = FileUtils.readFileToByteArray(new File(path));
            if (bs != null && bs.length > 0) {
                return CommandCodec.decodeCommand(bs, clazz);
            }
        }
    }
    

    在DataRepositoryService之中,又对load做了一些适配。

    @RaftService(uniqueId = "dataServer")
    public class DataRepositoryService extends AbstractSnapshotProcess
                                                                      implements
                                                                      RepositoryService<String, RenewDecorate<DataNode>> {
        @Override
        public boolean save(String path) {
            return save(path, registry);
        }
    
        @Override
        public synchronized boolean load(String path) {
                Map<String, NodeRepository> map = load(path, registry.getClass());
                registry.clear();
                registry.putAll(map);
                return true;
        }  
    }
    

    0xFF 参考

    服务注册中心 MetaServer 功能介绍和实现剖析 | SOFARegistry 解析

    服务注册中心如何实现 DataServer 平滑扩缩容 | SOFARegistry 解析

    服务注册中心数据一致性方案分析 | SOFARegistry 解析

    服务注册中心如何实现秒级服务上下线通知 | SOFARegistry 解析

    服务注册中心 Session 存储策略 | SOFARegistry 解析

    服务注册中心数据分片和同步方案详解 | SOFARegistry 解析

    服务注册中心 SOFARegistry 解析 | 服务发现优化之路

    海量数据下的注册中心 - SOFARegistry 架构介绍

    服务端部署

    客户端使用

    全面理解Raft协议

    详解蚂蚁金服 SOFAJRaft | 生产级高性能 Java 实现

    从JRaft来看Raft协议实现细节

    SOFAJRaft—初次使用

    JRaft 用户指南 & API 详解

    怎样打造一个分布式数据库——rocksDB, raft, mvcc,本质上是为了解决跨数据中心的复制

    sofa-bolt源码阅读(5)-日志

    Raft 为什么是更易理解的分布式一致性算法

    SOFAJRaft 源码分析一(启动流程和节点变化)

    SOFAJRaft 实现原理 - 生产级 Raft 算法库存储模块剖析

    客户端使用

  • 相关阅读:
    【Rust】多种错误类型
    【Rust】Result别名
    【Rust】Option然后
    【Rust】可选和错误
    【Rust】Result问号
    【Rust】Option转换
    【Rust】Option展开
    【Rust】Result结果
    【Rust】Result提前返回
    jQuery过滤 安静点
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/13795707.html
Copyright © 2011-2022 走看看