zoukankan      html  css  js  c++  java
  • State Management

    Samza的task可以把数据进行本地存储,并且对这些数据进行丰富的查询。
     
    比较SQL中的select ... where...并不需要保存状态。但是aggregation和join就需要存储row之间的状态。
    Samza提供了一些基本功能,能够使得streaming join和aggregation以及其它的有状态的处理更容易实现。
     
    何处需要stateful processing?
    • windowed aggregation 比如:每个用户每小时的点击量
      • 这种windowed processing通常用于ranking和relevance , 发现"trending topics ", 以及简单的实时reporting和monitoring。
      • 困难在于:当一个window处理的消息很多时,如果这个window 失败了,当重启时应该如何避免需要把全部消息重新处理一遍。
    • table-table join
    • stream-table join
    • stream-stream join
    如何管理task state? 如何支持这样的stateful processing?
     
    有以下几种常用方案
    In-memory state with checkpointing
    周期性的把task在内存中的数据做checkpoint. S4的状态管理就是这样做的。
    缺点是当作为state的数据量很大时,每次都完全dump所有数据不切实际,如果用diff又太复杂。
     
    Using an external store
    另一种常见的方案是把状态写进一个外部的数据库或者key-value store中。
    samza支持这种方式,但是提供了本地持久化作为更好的选项。
     
    Local state in Samza
    Samza allows tasks to maintain persistent, mutable, queryable state that is physically co-located with each task.
    Samza支持task在跟task同一台机器上维持持久化的、可变的、可查询的状态。
    每个task在写OutputStream的同时,还会写Changlog Stream。
     
    Key-value storage
    Kafka自带一个key-value store的实现,使用LevelDB。这个k-v store使用一个高可靠的"changelog" stream做为支撑,这个stream通过做为一个"redo log"来为task的状态提供了fault-tolerance功能。
     
    Fault-tolerance
    一个task的local storage实际上是一个缓存,那么当一个机器fail之后,怎么才能在另一个机器上重建这个缓存呢?
    这里有两种选择
    1. 在一个task重启时,重新读取曾经的所有输入以重建它的state。但是通常这个state会比input stream小得多,或者input stream是不可重放的。所以重新处理原有输入是一种浪费
    2. 使用一个changelog流。task把它的每次状态的改变记在这个流里。changelog就是一个普通的流,可以被其它人订阅。当然changelog是不断增长的,为了避免它占用太多空间,可以使用Kafka 0.8.1提供的log compaction功能,来去掉重复的条目。
    Using the key-value store
    首先在config里加上以下配置
     
    # Use the key-value store implementation for a store called "my-store"
    stores.my-store.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory

    # Log changes to the store to an output stream for restore
    # If no changelog is specified the store will not be logged (but you can still rebuild off your input streams)
    stores.my-store.changelog=kafka.my-stream-name

    # The serialization format to use
    stores.my-store.key.serde=string
    stores.my-store.msg.serde=string
     
    然后在StreamTask里这么写
     
    public class MyStatefulTask implements StreamTask, InitableTask {
      private KeyValueStore<String, String> store;

      public void init(Config config, TaskContext context) {
        this.store = (KeyValueStore<String, String>) context.getStore("store");
      }

      public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
        System.out.println("Adding " + envelope.getKey() + " => " + envelope.getMessage() + " to the store.");
        store.put((String) envelope.getKey(), (String) envelope.getMessage());
      }
    }
     
     
     
  • 相关阅读:
    康复计划
    Leetcode 08.02 迷路的机器人 缓存加回溯
    Leetcode 38 外观数列
    Leetcode 801 使序列递增的最小交换次数
    Leetcode 1143 最长公共子序列
    Leetcode 11 盛水最多的容器 贪心算法
    Leetcode 1186 删除一次得到子数组最大和
    Leetcode 300 最长上升子序列
    Leetcode95 不同的二叉搜索树II 精致的分治
    Leetcode 1367 二叉树中的列表 DFS
  • 原文地址:https://www.cnblogs.com/devos/p/3691850.html
Copyright © 2011-2022 走看看