zoukankan      html  css  js  c++  java
  • Kafka Product

    流程:

    1.product首先使用一个线程(用户主线程,也就是用户启动producer的线程)将待发送的消息封装进一个producerRecord类实例,然后将其系列化之后发给partitioner,
    再由后者确定了目标分区后一同发送到位于product程序中的一块内存缓冲区中。而product的另一个工作线程(I/O发送线程,也成sender线程)则负责实时的从缓冲区中
    提取处准备就绪的消息封装进一个批次,同一发送给对应的broker。
    

    如图:

    发送消息方式(3种)

    1.同步发送
    2.异步发送+回调
    3.fire and forget (发送之后不再理会)
    product send方法提供回调参数实现异步发送以及对发送结果的响应。也可以自定义回调函数 需要实现org.apache.kafka.clients.producer.Callback接口
    

    错误(2类)

    1.可重试异常(继承org.apache.kafka.common.errors.RetriableException)
    - LeaderNotAvailableException:通常处于leader换届选举期间,瞬时异常,重试后恢复。
    - NotControllerException:通常表明controller在经历新一轮选举,重试机制自行恢复。
    - NetworkException:网络异常。
    2.不可重试异常
    - RecoedTooLargerException:发送的消息尺寸过大,超过规定上限。
    - SerializationException:序列化失败异常。
    - KafkaException:其他类型的异常。
    

    发送参数acks

    - 0
      不理睬leader broker端的处理结果,立即发送下一条消息,producer.send的回调无效。但是吞吐量是最高的。
    - all
      leader broker 不仅会将消息写进本地日志,同事还会等待ISR(副本集合)中索引偶其他副本成功写入,保证消息不丢失,但吞吐量是最低的。
    - 1
      折中的方案,默认的参数。producer发送消息后leader broker仅将消息写进日志,然后发送响应结果给producer,无需等待ISP中其他副本写入该消息。
      即可以保证适当的消息持久性。也保证producer端的吞吐量。
    
  • 相关阅读:
    Cookie存储在哪里
    save the transient instance before flushing错误解决办法
    hibernate中简单的增删改查
    hibernate中get和load的区别
    使用Linux命令修改数据库密码
    配置solrcloud
    如何确定Redis集群中各个节点的主从关系
    解决Eclipse Debug 断点调试的source not found问题
    .NET框架
    ORM框架
  • 原文地址:https://www.cnblogs.com/snail-gao/p/12924435.html
Copyright © 2011-2022 走看看