zoukankan      html  css  js  c++  java
  • alpakka-kafka(2)-consumer

       alpakka-kafka-consumer的功能描述很简单:向kafka订阅某些topic然后把读到的消息传给akka-streams做业务处理。在kafka-consumer的实现细节上,为了达到高可用、高吞吐的目的,topic又可用划分出多个分区partition。分区是分布在kafka集群节点broker上的。由于一个topic可能有多个partition,对应topic就会有多个consumer,形成一个consumer组,共用统一的groupid。一个partition只能对应一个consumer、而一个consumer负责从多个partition甚至多个topic读取消息。kafka会根据实际情况将某个partition分配给某个consumer,即partition-assignment。所以一般来说我们会把topic订阅与consumer-group挂钩。这个可以在典型的ConsumerSettings证实:

      val system = ActorSystem("kafka-sys")
      val config = system.settings.config.getConfig("akka.kafka.consumer")
      val bootstrapServers = "localhost:9092"
      val consumerSettings =
        ConsumerSettings(config, new StringDeserializer, new ByteArrayDeserializer)
          .withBootstrapServers(bootstrapServers)
          .withGroupId("group1")
          .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    

    我们先用一个简单的consumer plainSource试试把前一篇示范中producer写入kafka的消息读出来: 

    import akka.actor.ActorSystem
    import akka.kafka._
    import akka.kafka.scaladsl._
    import akka.stream.{RestartSettings, SystemMaterializer}
    import akka.stream.scaladsl.{Keep, RestartSource, Sink}
    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
    
    import scala.concurrent._
    import scala.concurrent.duration._
    object plain_source extends App {
      val system = ActorSystem("kafka-sys")
      val config = system.settings.config.getConfig("akka.kafka.consumer")
      implicit val mat = SystemMaterializer(system).materializer
      implicit val ec: ExecutionContext = mat.executionContext
      val bootstrapServers = "localhost:9092"
      val consumerSettings =
        ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
          .withBootstrapServers(bootstrapServers)
          .withGroupId("group1")
          .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    
      val subscription = Subscriptions.topics("greatings")
      Consumer.plainSource(consumerSettings, subscription)
        .runWith(Sink.foreach(msg => println(msg.value())))
    
      scala.io.StdIn.readLine()
      system.terminate()
    
    }
    

    以上我们没有对读出的消息做任何的业务处理,直接显示出来。注意每次都会从头完整读出,因为设置了 .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),也就是kafka-consumer的auto.offset.reset = "earliest" 。那么如果需要用读出的数据进行业务处理的话,每次开始运行应用时都会重复从头执行这些业务。所以需要某种机制来标注已经读取的消息,也就是需要记住当前读取位置offset。

    Consumer.plainSource输入ConsumerRecord类型:

        public ConsumerRecord(String topic,
                              int partition,
                              long offset,
                              K key,
                              V value) {
            this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
                    NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);
        }
    

    这个ConsumerRecord类型里包括了offset,用户可以自行commit这个位置参数,也就是说用户可以选择把这个offset存储在kafka或者其它的数据库里。说到commit-offset,offset管理机制在kafka-consumer业务应用中应该属于关键技术。kafka-consumer方面的业务流程可以简述为:从kafka读出业务指令,执行指令并更新业务状态,然后再从kafka里读出下一批指令。为了实现业务状态的准确性,无论错过一些指令或者重复执行一些指令都是不能容忍的。所以,必须准确的标记每次从kafka读取数据后的指针位置,commit-offset。但是,如果读出数据后即刻commit-offset,那么在执行业务指令时如果系统发生异常,那么下次再从标注的位置开始读取数据时就会越过一批业务指令。这种情况称为at-most-once,即可能会执行一次,但绝不会重复。另一方面:如果在成功改变业务状态后再commit-offset,那么,一旦执行业务指令时发生异常而无法进行commit-offset,下次读取的位置将使用前一次的标注位置,就会出现重复改变业务状态的情况,这种情况称为at-least-once,即一定会执行业务指令,但可能出现重复更新情况。如果涉及资金、库存等业务,两者皆不可接受,只能采用exactly-once保证一次这种模式了。不过也有很多业务要求没那么严格,比如某个网站统计点击量,只需个约莫数,无论at-least-once,at-most-once都可以接受。

    kafka-consumer-offset是一个Long类型的值,可以存放在kafka内部或者外部的数据库里。如果选择在kafka内部存储offset, kafka配置里可以设定按时间间隔自动进行位置标注,自动把当前offset存入kafka里。当我们在上面例子的ConsumerSettings里设置自动commit后,多次重新运行就不会出现重复数据的情况了:

    val consumerSettings =
        ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
          .withBootstrapServers(bootstrapServers)
          .withGroupId("group1")
          .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")        //自动commit
          .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")   //自动commit间隔
          .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    

    alpakka-kafka提供了Committer类型,是akka-streams的Sink或Flow组件,负责把offset写入kafka。如果用Committer的Sink或Flow就可以按用户的需要控制commit-offset的发生时间。如下面这段示范代码: 

      val committerSettings = CommitterSettings(system)
    
      val control: DrainingControl[Done] =
        Consumer
          .committableSource(consumerSettings, Subscriptions.topics("greatings"))
          .mapAsync(10) { msg =>
            BusinessLogic.runBusiness(msg.record.key, msg.record.value)
              .map(_ => msg.committableOffset)
          }
          .toMat(Committer.sink(committerSettings))(DrainingControl.apply)
          .run()
    
    control.drainAndShutdown();
    
      scala.io.StdIn.readLine()
      system.terminate()
    
    }
    object BusinessLogic {
      def runBusiness(key: String, value: String): Future[Done] = Future.successful(Done)
    }
    

    上面这个例子里BusinessLogic.runBusiess()模拟一段业务处理代码,也就是说完成了业务处理之后就用Committer.sink进行了commit-offset。这是一种at-least-once模式,因为runBusiness可能会发生异常失败,所以有可能出现重复运算的情况。Consumer.committableSource输出CommittableMessage: 

    def committableSource[K, V](settings: ConsumerSettings[K, V],
                                  subscription: Subscription): Source[CommittableMessage[K, V], Control] =
        Source.fromGraph(new CommittableSource[K, V](settings, subscription))
    
    
    
      final case class CommittableMessage[K, V](
          record: ConsumerRecord[K, V],
          committableOffset: CommittableOffset
      )
    
      @DoNotInherit sealed trait CommittableOffset extends Committable {
        def partitionOffset: PartitionOffset
      }
    

    Committer.sink接受输入Committable类型并将之写入kafka,上游的CommittableOffset 继承了 Committable。另外,这个DrainingControl类型结合了Control类型和akka-streams终结信号可以有效控制整个consumer-streams安全终结。

    alpakka-kafka还有一个atMostOnceSource。这个Source组件每读一条数据就会立即自动commit-offset:

      def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
                                 subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
        committableSource[K, V](settings, subscription).mapAsync(1) { m =>
          m.committableOffset.commitInternal().map(_ => m.record)(ExecutionContexts.sameThreadExecutionContext)
        }
    

    可以看出来,atMostOnceSource在输出ConsumerRecord之前就进行了commit-offset。atMostOnceSource的一个具体使用示范如下:

      import scala.collection.immutable
      val control: DrainingControl[immutable.Seq[Done]] =
        Consumer
          .atMostOnceSource(consumerSettings, Subscriptions.topics("greatings"))
          .mapAsync(1)(record => BussinessLogic.runBusiness(record.key, record.value()))
          .toMat(Sink.seq)(DrainingControl.apply)
          .run()
    
      control.drainAndShutdown();
      scala.io.StdIn.readLine()
      system.terminate()
    

    所以,使用atMostOnceSource后是不需要任何Committer来进行commit-offset的了。值得注意的是atMostOnceSource是对每一条数据进行位置标注的,所以运行效率必然会受到影响,如果要求不是那么严格的话还是启动自动commit比较合适。

    对于任何类型的交易业务系统来说,无论at-least-once或at-most-once都是不可接受的,只有exactly-once才妥当。实现exactly-once的其中一个方法是把offset与业务数据存放在同一个外部数据库中。如果在外部数据库通过事务处理机制(transaction-processing)把业务状态更新与commit-offset放在一个事务单元中同进同退就能实现exactly-once模式了。下面这段是官方文档给出的一个示范:

      val db = new mongoldb
      val control = db.loadOffset().map { fromOffset =>
        Consumer
          .plainSource(
            consumerSettings,
            Subscriptions.assignmentWithOffset(
              new TopicPartition(topic, /* partition = */ 0) -> fromOffset
            )
          )
          .mapAsync(1)(db.businessLogicAndStoreOffset)
          .toMat(Sink.seq)(DrainingControl.apply)
          .run()
      }
    
    class mongoldb {
      def businessLogicAndStoreOffset(record: ConsumerRecord[String, String]): Future[Done] = // ...
      def loadOffset(): Future[Long] = // ...
    }
    

    在上面这段代码里:db.loadOffset()从mongodb里取出上一次读取位置,返回Future[Long],然后用Subscriptions.assignmentWithOffset把这个offset放在一个tuple (TopicPartition,Long)里。TopicPartition定义如下: 

        public TopicPartition(String topic, int partition) {
            this.partition = partition;
            this.topic = topic;
        }
    

    这样Consumer.plainSource就可以从offset开始读取数据了。plainSource输出ConsumerRecord类型:

        public ConsumerRecord(String topic,
                              int partition,
                              long offset,
                              K key,
                              V value) {
            this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
                    NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);
        }
    

    这里面除业务指令value外还提供了当前offset。这些已经足够在businessLogicAndStoreOffset()里运算一个单独的business+offset事务了(transaction)。 

     

  • 相关阅读:
    Atitit 经济学常见的流派 古典主义与凯恩斯主义
    Atitit 学习方法 体系化学习方法 Excel 科目,分类,专业 三级分类。。 知识点。。 课程就是每一个知识点的详细化。。 比如经济学 类别 专业 xx概论知识点 3、金
    atiitt it学科体系化 体系树与知识点概念大总结.xlsx
    Atitit 减少财政支出普通人如何蹭政府补贴措施 attilax大总结.docx
    Atitit 信用管理概论 attilax学习心得
    Atitit.月度计划日程表 每月流程表v5
    Atitit 企业6大职能 attilax总结
    Atitit 常见每日流程日程日常工作.docx v8 ver ampm imp 签到 am y 天气情况检查 am y 晨会,每天或者隔天 am 每日计划(项目计划,日计划等。 am
    Atitit 财政赤字解决方案
    Atitit 建设自己的财政体系 attilax总结 1.1. 收入理论 2 1.2. 收入分类 2 1.3. 2 1.4. 非货币收入 2 1.5. 2 1.6. 降低期望 2 1.7.
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/14432566.html
Copyright © 2011-2022 走看看