zoukankan      html  css  js  c++  java
  • Akka源码分析-Persistence Query

      Akka Persistence Query是对akka持久化的一个补充,它提供了统一的、异步的流查询接口。今天我们就来研究下这个Persistence Query。

      前面我们已经分析过Akka Persistence,它是用来持久化actor状态并在适当时机恢复actor的,简单来说它是用来写入的。那么Persistence Query与Persistence相对应,是用来读取数据的,一般用在读写分离的read side。

      Persistence Query主要的目标是设计一套松散的API,这样各个实现才能充分暴露他们各自的特点或性能,而不被API所约束。每个读取器都必须显示的说明他们支持的查询类型。

      Akka Persistence Query并不提供ReadJournals的具体实现。它只是定义了一些预定义的、满足大部分查询场景的查询类型,且可能被绝大多数journal实现的,当然并不要求一定实现。

      ReadJournal是提交查询必须首要创建的一个实例,所有的读取器(Read journal)都以社区插件的形式实现,官方只提供框架。每个特定的实现都对应特定的存储。

    // obtain read journal by plugin id
    val readJournal =
      PersistenceQuery(system).readJournalFor[MyScaladslReadJournal](
        "akka.persistence.query.my-read-journal")
    
    // issue query to journal
    val source: Source[EventEnvelope, NotUsed] =
      readJournal.eventsByPersistenceId("user-1337", 0, Long.MaxValue)
    
    // materialize stream, consuming events
    implicit val mat = ActorMaterializer()
    source.runForeach { event ⇒ println("Event: " + event) }
    

       官方demo的第一行代码就是通过配置创建了一个readJournal实例。很显然它是通过PersistenceQuery这个扩展的readJournalFor方法来创建的,第二行代码显示,它至少支持eventsByPersistenceId这个预定义的查询。

      我们知道Akka persistence query提供了一些预定义查询接口,和journal实现的框架。那预定义的查询接口是否通用就很重要了。预定义的接口有:

    • PersistenceIdsQuery
    • CurrentPersistenceIdsQuery
    • EventsByPersistenceIdQuery
    • CurrentEventsByPersistenceIdQuery
    • EventsByTag
    • CurrentEventsByTag

       其实demo中的readJournalFor的第一个参数就是一个persistenceId,它是用来订阅系统中所有持久化数据的一个流的。简单来说,我们可以理解为RDBMS中的一个表名。PersistenceIdsQuery就是用来查询当前系统有多少个可用的“表”的。PersistenceIdsQuery和CurrentPersistenceIdsQuery的区别就是,后者只是获取当前persistenceId的一个快照,而前者是实时的。

      EventsByPersistenceIdQuery可以理解为通过表名查询数据,查询结果也是一个实时的流。CurrentEventsByPersistenceIdQuery是全量查询一个快照。

      EventsByTag可以理解为按照数据的标签查询,它忽略了数据关联的persistenceId。其实这一点不太好实现,它跨越了“表”查询数据,很多存储系统都是以表做物理数据的隔离的,如果跨表有可能意味着跨物理存储,而且对性能也会有影响。另外一个很重要的问题就是,通过tag查询数据,往往不能保证数据的顺序。CurrentEventsByTag也是查询当前数据的快照,其实就是全量查询。

      查询的具体化值。我们还可以给journal传递额外的信息,比如过滤条件、是否排序等等。这是通过byTagsWithMeta方法实现的。下面我们去源码看下。

    class PersistenceQuery(system: ExtendedActorSystem)
      extends PersistencePlugin[scaladsl.ReadJournal, javadsl.ReadJournal, ReadJournalProvider](system)(ClassTag(classOf[ReadJournalProvider]), PersistenceQuery.pluginProvider)
      with Extension 
    

       上面是PersistenceQuery这个扩展的定义,继承了PersistencePlugin,并提供了readJournalFor的方法。

      /**
       * Scala API: Returns the [[akka.persistence.query.scaladsl.ReadJournal]] specified by the given
       * read journal configuration entry.
       *
       * The provided readJournalPluginConfig will be used to configure the journal plugin instead of the actor system
       * config.
       */
      final def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String, readJournalPluginConfig: Config): T =
        pluginFor(readJournalPluginId, readJournalPluginConfig).scaladslPlugin.asInstanceOf[T]
    

       其实就是通过readJournalPluginId创建了一个ReadJournal实例,这个函数还是去调用了pluginFor方法。

      @tailrec
      final protected def pluginFor(pluginId: String, readJournalPluginConfig: Config): PluginHolder[ScalaDsl, JavaDsl] = {
        val configPath = pluginId
        val extensionIdMap = plugins.get
        extensionIdMap.get(configPath) match {
          case Some(extensionId) ⇒
            extensionId(system)
          case None ⇒
            val extensionId = new ExtensionId[PluginHolder[ScalaDsl, JavaDsl]] {
              override def createExtension(system: ExtendedActorSystem): PluginHolder[ScalaDsl, JavaDsl] = {
                val provider = createPlugin(configPath, readJournalPluginConfig)
                PluginHolder(
                  ev.scalaDsl(provider),
                  ev.javaDsl(provider)
                )
              }
            }
            plugins.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
            pluginFor(pluginId, readJournalPluginConfig)
        }
      }
    

       pluginFor其实也比较简单,它就是返回了PluginHolder,而PluginHolder继承了Extension只不过有两个类型参数:ScalaDsl、JavaDsl。如果没有找到就创建默认的,默认应该就是leveldb。其实可以看到PersistenceQuery只是提供了创建ReadJournal的接口,以及两个类型参数,并没有做过多的限制。那ReadJournal这个trait又是什么样的呢?

    /**
     * API for reading persistent events and information derived
     * from stored persistent events.
     *
     * The purpose of the API is not to enforce compatibility between different
     * journal implementations, because the technical capabilities may be very different.
     * The interface is very open so that different journals may implement specific queries.
     *
     * There are a few pre-defined queries that a query implementation may implement,
     * such as [[EventsByPersistenceIdQuery]], [[PersistenceIdsQuery]] and [[EventsByTagQuery]]
     * Implementation of these queries are optional and query (journal) plugins may define
     * their own specialized queries by implementing other methods.
     *
     * Usage:
     * {{{
     * val journal = PersistenceQuery(system).readJournalFor[SomeCoolReadJournal](queryPluginConfigPath)
     * val events = journal.query(EventsByTag("mytag", 0L))
     * }}}
     *
     * For Java API see [[akka.persistence.query.javadsl.ReadJournal]].
     */
    trait ReadJournal
    

       其实可以看到ReadJournal也没有任何默认的方法,这样来看,即使继承了这个接口的read journal不提供任何查询方法,或提供不符合预定义接口的方法也都是完全可以的。

    /**
     * A plugin may optionally support this query by implementing this trait.
     */
    trait PersistenceIdsQuery extends ReadJournal {
    
      /**
       * Query all `PersistentActor` identifiers, i.e. as defined by the
       * `persistenceId` of the `PersistentActor`.
       *
       * The stream is not completed when it reaches the end of the currently used `persistenceIds`,
       * but it continues to push new `persistenceIds` when new persistent actors are created.
       * Corresponding query that is completed when it reaches the end of the currently
       * currently used `persistenceIds` is provided by [[CurrentPersistenceIdsQuery#currentPersistenceIds]].
       */
      def persistenceIds(): Source[String, NotUsed]
    
    }
    

       我们来看看预定义的查询接口PersistenceIdsQuery,其实它就只是定义了一个方法,这个方法返回一个Source。那如果继承ReadJournal的自定义读取器不继承这个接口可以吗?完全可以!

    /**
     * Scala API [[akka.persistence.query.scaladsl.ReadJournal]] implementation for LevelDB.
     *
     * It is retrieved with:
     * {{{
     * val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
     * }}}
     *
     * Corresponding Java API is in [[akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal]].
     *
     * Configuration settings can be defined in the configuration section with the
     * absolute path corresponding to the identifier, which is `"akka.persistence.query.journal.leveldb"`
     * for the default [[LeveldbReadJournal#Identifier]]. See `reference.conf`.
     */
    class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends ReadJournal
      with PersistenceIdsQuery with CurrentPersistenceIdsQuery
      with EventsByPersistenceIdQuery with CurrentEventsByPersistenceIdQuery
      with EventsByTagQuery with CurrentEventsByTagQuery
    

       这是默认提供的LeveldbReadJournal的实现,其实从这个接口来看它具有PersistenceIdsQuery、CurrentPersistenceIdsQuery、EventsByPersistenceIdQuery、CurrentEventsByPersistenceIdQuery、EventsByTagQuery、CurrentEventsByTagQuery的功能。

      其实吧,从这个具体实现来看,Akka Persistence Query只是提供了一个扩展,规定了ReadJournal实现的机制,对其具体的功能并没有任何强制的要求,这样就意味着,只能从文档上规范大家,要实现ReadJournal特质的一系列预定义接口。个人觉得吧,只要不是技术上的强制措施,在实施的时候都会有困难,既然你都不做严格限定了,那还不是百花齐放百家争鸣、千奇百怪的了。还不如直接定死了要实现的几个接口,如果不支持对应的操作直接抛异常不就完了。可能akka想让大家灵活点吧,鬼知道呢。

      其实吧,Akka Persistence Query就算讲完了。akka好像啥也没做,只是文档定义了几个概念和接口,具体有没有实现并没有做强制要求。既然这样,大家还不如直接自己玩。实现自己的查询接口与实现,供大家参考使用呢。

    Persistence Query

  • 相关阅读:
    Python 常用集合类型的增删改查分合 list tuple dict set
    C# Protobuf如何做到0分配内存的序列化/反序列化(2)
    用java springboot 下载无水印抖音快手视频
    下载腾讯视频为mp4简单方法
    java springboot笔记
    如何下载西瓜视频中的 blob:https 的视频下载
    一行Python代码实现for循环和if else判断
    Redis报错:MISCONF Redis is configured to save RDB snapshots, but it is currently not able to...
    pandas pd.concat() 提示没有 join_axes 参数
    Flask+Celery 执行时报错:def _connparams(self, async=False, _r210_options=( ^ SyntaxError: invalid syntax
  • 原文地址:https://www.cnblogs.com/gabry/p/9584175.html
Copyright © 2011-2022 走看看