zoukankan      html  css  js  c++  java
  • Copycat

    看下用户注册StateMachine的过程,

    CopycatServer.Builder builder = CopycatServer.builder(address);
    builder.withStateMachine(MapStateMachine::new);

    MapStateMachine::new这会构造一个supplier

    /**
         * Sets the Raft state machine factory.
         *
         * @param factory The Raft state machine factory.
         * @return The server builder.
         * @throws NullPointerException if the {@code factory} is {@code null}
         */
        public Builder withStateMachine(Supplier<StateMachine> factory) {
          this.stateMachineFactory = Assert.notNull(factory, "factory");
          return this;
        }

    在build中,传入初始化ServerContext

    ServerContext context = new ServerContext(name, type, serverAddress, clientAddress, storage, serializer, stateMachineFactory, connections, threadContext);

    ServerContext中,

    this.stateMachineFactory = Assert.notNull(stateMachineFactory, "stateMachineFactory");
    threadContext.execute(this::reset).join();

    reset逻辑中,

        // Create a new user state machine.
        StateMachine stateMachine = stateMachineFactory.get();
    // Create a new internal server state machine.
        this.stateMachine = new ServerStateMachine(stateMachine, this, stateContext);

    这里看到stateContext的定义,

    this.stateContext = new SingleThreadContext(String.format("copycat-server-%s-%s-state", serverAddress, name), threadContext.serializer().clone());

    也是一个单线程,所以这里有两个threadContext

    这个stateContex是专门用于更新state

    ServerStateMachine,用于管理StateMachine

    用户定义的StateMachine中定了对于各种command的处理function,而在copycat中需要一个组件来管理用户的StateMachine,就是ServerStateMachine

    ServerStateMachine(StateMachine stateMachine, ServerContext state, ThreadContext executor) {
        this.stateMachine = Assert.notNull(stateMachine, "stateMachine");
        this.state = Assert.notNull(state, "state");
        this.log = state.getLog();
        this.executor = new ServerStateMachineExecutor(new ServerStateMachineContext(state.getConnections(), new ServerSessionManager(state)), executor);
        this.commits = new ServerCommitPool(log, this.executor.context().sessions());
        init();
      }

    ServerStateMachineExecutor

    作为StateMachine的执行环境

    class ServerStateMachineExecutor implements StateMachineExecutor {
      private static final Logger LOGGER = LoggerFactory.getLogger(ServerStateMachineExecutor.class);
      private final ThreadContext executor;
      private final ServerStateMachineContext context;
      private final Queue<ServerTask> tasks = new ArrayDeque<>();
      private final List<ServerScheduledTask> scheduledTasks = new ArrayList<>();
      private final List<ServerScheduledTask> complete = new ArrayList<>();
      private final Map<Class, Function> operations = new HashMap<>();

    init

    /**
       * Initializes the state machine.
       */
      private void init() {
        stateMachine.init(executor);
      }

    注意这里stateMachine类是用户定义的,

    public void init(StateMachineExecutor executor) {
        this.executor = Assert.notNull(executor, "executor");
        this.context = executor.context();
        this.clock = context.clock();
        this.sessions = context.sessions();
        if (this instanceof SessionListener) {
          executor.context().sessions().addListener((SessionListener) this);
        }
        configure(executor);
      }

    configure

    protected void configure(StateMachineExecutor executor) {
        registerOperations();
      }
    /**
       * Registers operations for the class.
       */
      private void registerOperations() {
        Class<?> type = getClass();
        for (Method method : type.getMethods()) {
          if (isOperationMethod(method)) {
            registerMethod(method);
          }
        }
      }
    
      /**
       * Returns a boolean value indicating whether the given method is an operation method.
       */
      private boolean isOperationMethod(Method method) {
        Class<?>[] paramTypes = method.getParameterTypes();
        return paramTypes.length == 1 && paramTypes[0] == Commit.class;
      }

    我们看下,用户是如何定义operations的?

    public class MapStateMachine extends StateMachine {
      private Map<Object, Object> map = new HashMap<>();
    
      public Object put(Commit<PutCommand> commit) {
        try {
          map.put(commit.operation().key(), commit.operation().value());
        } finally {
          commit.close();
        }
      }
    
      public Object get(Commit<GetQuery> commit) {
        try {
          return map.get(commit.operation().key());
        } finally {
          commit.close();
        }
      }
    }

    你就理解这里通过reflection来找到Operation,

    逻辑就是有一个参数,参数的类型是Commit

    如果是Operation,调用registerMethod

    private void registerMethod(Method method) {
        Type genericType = method.getGenericParameterTypes()[0];
        Class<?> argumentType = resolveArgument(genericType);
        if (argumentType != null && Operation.class.isAssignableFrom(argumentType)) {
          registerMethod(argumentType, method);
        }
      }

    取得泛型的类型,例子里面的Put

    private void registerMethod(Class<?> type, Method method) {
        Class<?> returnType = method.getReturnType();
        if (returnType == void.class || returnType == Void.class) {
          registerVoidMethod(type, method);
        } else {
          registerValueMethod(type, method);
        }
      }
    private void registerValueMethod(Class type, Method method) {
        executor.register(type, wrapValueMethod(method));
      }
    
      /**
       * Wraps a value method.
       */
      private Function wrapValueMethod(Method method) {
        return c -> {
          try {
            return method.invoke(this, c);
          } catch (InvocationTargetException e) {
            throw new CommandException(e);
          } catch (IllegalAccessException e) {
            throw new AssertionError(e);
          }
        };
      }
    ServerStateMachineExecutor.register
    @Override
      public <T extends Operation<U>, U> StateMachineExecutor register(Class<T> type, Function<Commit<T>, U> callback) {
        operations.put(type, callback);
        return this;
      }

    这里,会把operations注册到ServerStateMachineExecutor里面,便于后面调用

    继续ServerStateMachine,

    ServerStateMachine最主要的逻辑,就是apply,即把command apply到state machine上,

    可以apply到某index为止的所有commit

    /**
       * Applies all commits up to the given index.
       * <p>
       * Calls to this method are assumed not to expect a result. This allows some optimizations to be
       * made internally since linearizable events don't have to be waited to complete the command.
       *
       * @param index The index up to which to apply commits.
       */
      public void applyAll(long index) {// If the effective commit index is greater than the last index applied to the state machine then apply remaining entries.
        long lastIndex = Math.min(index, log.lastIndex());
        if (lastIndex > lastApplied) {
          for (long i = lastApplied + 1; i <= lastIndex; i++) { // 接着上次最后apply的index,继续
            Entry entry = log.get(i);
            if (entry != null) {
              apply(entry).whenComplete((result, error) -> entry.release());
            }
            setLastApplied(i); 
          }
        }
      }


    也可以单独apply一条index对应的entry

    public <T> CompletableFuture<T> apply(long index) {
        // If entries remain to be applied prior to this entry then synchronously apply them.
        if (index > lastApplied + 1) {
          applyAll(index - 1);  //按顺序apply,所以之前的先要apply掉
        }
    
        // Read the entry from the log. If the entry is non-null them apply the entry, otherwise
        // simply update the last applied index and return a null result.
        try (Entry entry = log.get(index)) {
          if (entry != null) {
            return apply(entry);
          } else {
            return CompletableFuture.completedFuture(null);
          }
        } finally {
          setLastApplied(index);
        }
      }

    apply(entry)

    /**
       * Applies an entry to the state machine.
       * <p>
       * Calls to this method are assumed to expect a result. This means linearizable session events
       * triggered by the application of the given entry will be awaited before completing the returned future.
       *
       * @param entry The entry to apply.
       * @return A completable future to be completed with the result.
       */
      @SuppressWarnings("unchecked")
      public <T> CompletableFuture<T> apply(Entry entry) {
    if (entry instanceof QueryEntry) {
          return (CompletableFuture<T>) apply((QueryEntry) entry);
        } else if (entry instanceof CommandEntry) {
          return (CompletableFuture<T>) apply((CommandEntry) entry);
        } else if (entry instanceof RegisterEntry) {
          return (CompletableFuture<T>) apply((RegisterEntry) entry);
        } else if (entry instanceof KeepAliveEntry) {
          return (CompletableFuture<T>) apply((KeepAliveEntry) entry);
        } else if (entry instanceof UnregisterEntry) {
          return (CompletableFuture<T>) apply((UnregisterEntry) entry);
        } else if (entry instanceof InitializeEntry) {
          return (CompletableFuture<T>) apply((InitializeEntry) entry);
        } else if (entry instanceof ConfigurationEntry) {
          return (CompletableFuture<T>) apply((ConfigurationEntry) entry);
        }
        return Futures.exceptionalFuture(new InternalException("unknown state machine operation"));
      }

    看到不同的entry类型有不同的apply逻辑,

    apply((CommandEntry) entry)

    private CompletableFuture<Result> apply(CommandEntry entry) {
        final CompletableFuture<Result> future = new CompletableFuture<>();
        final ThreadContext context = ThreadContext.currentContextOrThrow(); //这里保留当前thread的引用
    
        // First check to ensure that the session exists.
        ServerSessionContext session = executor.context().sessions().getSession(entry.getSession());
    
        // If the session is null, return an UnknownSessionException. Commands applied to the state machine must
        // have a session. We ensure that session register/unregister entries are not compacted from the log
        // until all associated commands have been cleaned.
        if (session == null) { //session不存在
          log.release(entry.getIndex());
          return Futures.exceptionalFuture(new UnknownSessionException("unknown session: " + entry.getSession()));
        }
        // If the session is not in an active state, return an UnknownSessionException. Sessions are retained in the
        // session registry until all prior commands have been released by the state machine, but new commands can
        // only be applied for sessions in an active state.
        else if (!session.state().active()) { //session的状态非active
          log.release(entry.getIndex());
          return Futures.exceptionalFuture(new UnknownSessionException("inactive session: " + entry.getSession()));
        }
        // If the command's sequence number is less than the next session sequence number then that indicates that
        // we've received a command that was previously applied to the state machine. Ensure linearizability by
        // returning the cached response instead of applying it to the user defined state machine.
        else if (entry.getSequence() > 0 && entry.getSequence() < session.nextCommandSequence()) { //已经apply过的entry
          // Ensure the response check is executed in the state machine thread in order to ensure the
          // command was applied, otherwise there will be a race condition and concurrent modification issues.
          long sequence = entry.getSequence();
    
          // Switch to the state machine thread and get the existing response.
          executor.executor().execute(() -> sequenceCommand(sequence, session, future, context)); //直接返回之前apply的结果
          return future;
        }
        // If we've made it this far, the command must have been applied in the proper order as sequenced by the
        // session. This should be the case for most commands applied to the state machine.
        else {
          // Allow the executor to execute any scheduled events.
          long index = entry.getIndex();
          long sequence = entry.getSequence();
    
          // Calculate the updated timestamp for the command.
          long timestamp = executor.timestamp(entry.getTimestamp());
    
          // Execute the command in the state machine thread. Once complete, the CompletableFuture callback will be completed
          // in the state machine thread. Register the result in that thread and then complete the future in the caller's thread.
          ServerCommit commit = commits.acquire(entry, session, timestamp); //这里有个ServerCommitPool的实现,为了避免反复生成ServerCommit对象,直接从pool里面拿一个,用完放回去
          executor.executor().execute(() -> executeCommand(index, sequence, timestamp, commit, session, future, context));
    
          // Update the last applied index prior to the command sequence number. This is necessary to ensure queries sequenced
          // at this index receive the index of the command.
          setLastApplied(index);
    
          // Update the session timestamp and command sequence number. This is done in the caller's thread since all
          // timestamp/index/sequence checks are done in this thread prior to executing operations on the state machine thread.
          session.setTimestamp(timestamp).setCommandSequence(sequence);
          return future;
        }
      }
    executeCommand
    ServerCommit commit = commits.acquire(entry, session, timestamp);
    executor.executor().execute(() -> executeCommand(index, sequence, timestamp, commit, session, future, context));

    注意这里有两个线程,

    一个是context,是

    ThreadContext threadContext

    用来响应server请求的

    还有一个是executor里面的stateContext,用来改变stateMachine的状态的

    所以这里是用executor来执行executeCommand,但把ThreadContext传入

    /**
       * Executes a state machine command.
       */
      private void executeCommand(long index, long sequence, long timestamp, ServerCommit commit, ServerSessionContext session, CompletableFuture<Result> future, ThreadContext context) {
    
        // Trigger scheduled callbacks in the state machine.
        executor.tick(index, timestamp);
    
        // Update the state machine context with the commit index and local server context. The synchronous flag
        // indicates whether the server expects linearizable completion of published events. Events will be published
        // based on the configured consistency level for the context.
        executor.init(commit.index(), commit.time(), ServerStateMachineContext.Type.COMMAND);
    
        // Store the event index to return in the command response.
        long eventIndex = session.getEventIndex();
    
        try {
          // Execute the state machine operation and get the result.
          Object output = executor.executeOperation(commit);
    
          // Once the operation has been applied to the state machine, commit events published by the command.
          // The state machine context will build a composite future for events published to all sessions.
          executor.commit();
    
          // Store the result for linearizability and complete the command.
          Result result = new Result(index, eventIndex, output);
          session.registerResult(sequence, result); // 缓存执行结果
          context.executor().execute(() -> future.complete(result)); // complete future,表示future执行结束
        } catch (Exception e) {
          // If an exception occurs during execution of the command, store the exception.
          Result result = new Result(index, eventIndex, e);
          session.registerResult(sequence, result);
          context.executor().execute(() -> future.complete(result));
        }
      }
    ServerStateMachineExecutor.tick
    根据时间,去触发scheduledTasks中已经到时间的task
     
    ServerStateMachineExecutor.init
    更新state machine的context
    void init(long index, Instant instant, ServerStateMachineContext.Type type) {
        context.update(index, instant, type);
      }
      
      //ServerStateMachineContext
      void update(long index, Instant instant, Type type) {
        this.index = index;
        this.type = type;
        clock.set(instant);
      }
     
    ServerStateMachineExecutor.executeOperation
    <T extends Operation<U>, U> U executeOperation(Commit commit) {
    
        // Get the function registered for the operation. If no function is registered, attempt to
        // use a global function if available.
        Function function = operations.get(commit.type()); //从operations找到type对应的function
    
        if (function == null) {
          // If no operation function was found for the class, try to find an operation function
          // registered with a parent class.
          for (Map.Entry<Class, Function> entry : operations.entrySet()) {
            if (entry.getKey().isAssignableFrom(commit.type())) { //如果注册的type是commit.type的父类
              function = entry.getValue();
              break;
            }
          }
    
          // If a parent operation function was found, store the function for future reference.
          if (function != null) {
            operations.put(commit.type(), function);
          }
        }
    
        if (function == null) {
          throw new IllegalStateException("unknown state machine operation: " + commit.type());
        } else {
          // Execute the operation. If the operation return value is a Future, await the result,
          // otherwise immediately complete the execution future.
          try {
            return (U) function.apply(commit); //真正执行function
          } catch (Exception e) {
            throw new ApplicationException(e, "An application error occurred");
          }
        }
      }
     
     
     
     
     
  • 相关阅读:
    Go interface{}、类型断言
    相关资料
    php实践
    安装zookeeper
    对象池化,对象池
    java getResourcesAsStream()如何获取WEB-INF下的文件流
    android--SDK Manager下载Connection to http://dl-ssl.google.com refused
    Intellij idea 切换SVN路径
    Intellij Idea @Autowired取消提示
    恢复文件默认打开方式
  • 原文地址:https://www.cnblogs.com/fxjwind/p/6560271.html
Copyright © 2011-2022 走看看