zoukankan      html  css  js  c++  java
  • Stateful Future Transformation

    As an async programming pattern, Future has been popular with many of our programmers across a wide range of languages. Loosely speaking, Future is a wrapper around a value which will be available at some point in the future. Strictly speaking, Future is a monad which supports the following 3 operations:

    unit :: T -> Future<T>
    map :: (T -> R) -> (Future<T> -> Future<R>)
    flatMap :: (T -> Future<R>) -> (Future<T> -> Future<R>)
    

    When holding a future, we know the type of the value, we can register callbacks which will be called when the future is done. But callbacks are not the recommended way to deal with futures, the point of Future pattern is to avoid callbacks and in favor of future transformation. By properly using future transformation, we can make our async code look like sequential code, the callbacks are hidden from us by the futures.

    Here is an example, say there are 2 async RPCs. One takes a user ID and returns a future of a list of the user's new message header (ID and title), the other takes a message ID and returns its body.

    // RPC 1: Gets a list of new message (headers) of a user.
    Future<NewMessagesResponse> getNewMessages(UserId userId);
    
    // RPC 2: Gets the full message for a message ID.
    Future<Message> getMessage(MessageId messageId);
    
    // Data structures.
    class Message {
      class Header {
        MessageId id;
        String title;
      }
      class Body {
        ...
      }
    
      Header header;
      Body body;
    }
    
    class NewMessagesResponse {
      List<MessageHeaders> headers;
    }
    

    Your task is that, given a user ID and a keyword, get the user's new messages whose title contains the keyword. With future transformation, the code may look like:

    // Gets the future of a list of messages for a user, whose titles contains a given keyword.
    Future<List<Message>> getNewMessages(UserId userId, String keyword) {
      Future<NewMessagesResponse> newMessagesFuture = getNewMessages(userId);
      Future<List<MessageId>> interestingIdsFuture = filter(newMessagesFuture, keyword);
      Future<List<Message>> messagesFuture = getMessages(interestingIdsFuture);
      Return messages;
    }
    

    The structure of the code is similar to what we do with synchronous code:

    List<Message> getNewMessages(UserId userId, String keyword) {
      NewMessagesResponse newMessages = getNewMessages(userId);
      List<MessageId> interestingIds = filter(newMessages, keyword);
      List<Message> messages = getMessages(interestingIds);
      Return messages;
    }
    

    The async and sync functions are isomorphic, there is a correspondence in their code structure. But their runtime behaviors are different, one happens asynchronously, one happens synchronously.

    Now here comes the real challenge. What if we change the RPC a bit, say there may be too many new messages that it has to return messages page by page, each response may contain an optional next page token indicating there are more pages.

    // RPC 1: Gets one page of the new message (headers) of a user. The page number is denoted by a pageToken.
    Future<NewMessagesResponse> getNewMessageHeaders(UserId userId, String pageToken)
    
    class NewMessagesResponse {
      List<MessageHeaders> messageHeaders;
      String nextPageToken; // Non-empty nextPageToken indicates there are more pages.
    }
    

    Your task remains the same, write a function which takes a user ID and a keyword, return a list of the user's new messages whose titles contain the keyword.

    Future<List<Message>> getNewMessages(UserId userId, String keyword) {
      //TODO
    }
    

    The difficulty lies with that in regular future transformations we have fixed number of steps, we can simply chain them together sequentially, then we get one future of the final result; but now because of pagination, the number of steps is not nondeterministic, how can we chain them together?

    For synchronous code, we may use a loop like:

    List<Message> getNewMessages(UserId userId, String keyword) {
      List<MessageId> interestingMessages = new ArrayList<>();
      String pageToken = "";
      do {
        NewMessagesResponse newMessages = getNewMessages(userId, pageToken);
        List<MessageId> interestingIds = filter(newMessages, keyword);
        allNewMessages.addAll(newMessages.headers);
        pageToken = newMessages.nextPageToken;
      } while (!isEmpty(pageToken));
    }
    

    But unfortunately loop is applicable to futures. How can we get one future for all the pages? Recursion comes to rescue. This is what I call *Stateful Future Transformation*.

    class State {
      UserId userId;
      String keyword;
      int pageIndex;
      String pageToken;
      List<MessageId> buffer;
    }
    
    Future<State> getInterestingMessages(Future<State> stateFuture) {
      return Future.transform(
          stateFuture, (State state) -> {
            if (state.pageIndex == 0 || !isEmpty(state.pageToken)) {
              // Final state.
              return Future.immediate(state); 
            } else {
              // Intermediate state.
              Future<NewMessagesResponse> newMessagesFuture =
                  getNewMessages(state.userId, state.pageToken);
              return Future.transform(newMessagesFuture, newMessages -> {
                state.pageIndex++;
                state.pageToken = newMessages.nextPageToken;
                state.buffer.addAll(filter(newMessages, state.keyword);            
              });
            }
          });
    }
    
    Future<State> getInterestingMessages(UserId userId, String keyword) {
      State initialState = new State(userId, keyword, 0, "", new ArrayList());
      Future<State> initialStateFuture = Future.immediate(initialState);
      return getInterestingMessages(initialStateFuture);
    }
    

    The code above can be refactored into a general stateful future transformation function:

    // Transforms the future of an initial state future into the future of its final state.
    Future<StateT> transform(
        Future<StateT> stateFuture,
        Function<StateT, Boolean> isFinalState,
        Function<StateT, Future<StateT>> getNextState) {
      return Future.transform(
          stateFuture,
          (StateT state) -> {
            return isFinalState.apply(state)
                ? Future.immediate(state)
                : transform(getNextState.appy(state));
          }
      });
    }
    
  • 相关阅读:
    题解 CF1361B Johnny and Grandmaster
    题解 AT2582 [ARC075D] Mirrored
    题解 P2081 [NOI2012] 迷失游乐园
    第八课:人人站模板开发(获取产品分类信息标签)
    第十二课:人人站模板开发(links 标签获取友情链接列表)
    第七课:人人站模板开发(menus 获取导航菜单标签学习)
    第十课:人人站模板开发(nodes标签获取栏目列表)
    第十五课:人人站后台安装后忘记后台登录地址情况
    第九课:人人站模板开发(goods标签获取产品数据列表)
    第十一课:人人站模板开发(articles 获取文章列表)
  • 原文地址:https://www.cnblogs.com/weidagang2046/p/stateful-future-transformation.html
Copyright © 2011-2022 走看看