zoukankan      html  css  js  c++  java
  • kafka基础

    一 概述

    1  消息系统类型

          消息系统主要分为peer-to-peer和发布/订阅两种。

         (1)peer-to-peer : 对等计算,点对点(端到端),每个参与者既是服务端,也是客户端。一般基于pull。

            发送到队列中的消息被一个而且仅仅一个接收者所接收, 即使有多个接收者在同一个队列中侦听同一消息。                                   

        (2)发布/订阅: 发布到一个主题的消息,可被多个订阅者所接收。

           即可基于push广播数据,也可pull消费数据。

    2  为什么使用消息系统

           (1)解耦:两边业务方遵守同样一套接口规则,无需互相了解彼此存在,两边可以独立扩展和修改。

           (2)冗余:许多消息队列所采用的”插入-获取-删除”,需要你的处理系统明确的指出该消息已经被处理完毕才能删除,有消息持久化能力可规避消息处理前丢失的风险。

           (3)扩展性:参照统一数据接口,各业务可以独立扩展。

           (4)峰值处理能力:消息系统可顶住峰值流量,业务系统可根据处理能力从消息系统中获取并处理对应量的请求。

           (5)可恢复性:系统中部分组件失效并不会影响整个系统,它恢复后仍然可从消息系统中获取并处理数据。

           (6)异步通信:在不需要立即处理请求的场景下,可以将请求放入消息系统,合适的时候再处理

           (7)顺序保证和缓冲:保证数据会按照特定的顺序来处理,Kafka保证一个Partition内的消息的有序性。消息队列通过一个缓冲层来帮助任务最高效率的执行,写入队列的处理会尽可能的快速,有利于控制和优化数据流。

    3 常用消息系统

           (1)RabbitMQ  开源消息队列,支持负载均衡,持久化,支持多协议,非常重量级。

           (2)Redis  基于Key-Value对的NoSQL数据库,支持MQ功能,可做轻量级队列服务使用。就入队操作 Redis对短消息(小于10KB)的性能比RabbitMQ好,长消息的性能比RabbitMQ差。

             (3)   ZeroMQ  轻量级,不需要单独的消息服务器或中间件,应用程序本身扮演该角色,Peer-to-Peer。它实质上是 一个库,需要开发人员自己组合多种技术,使用复杂度高,门槛搞。号称速度最快,但不支持持久化。

          (4)ActiveMQ apache的子项目,JMS实现,Peer-to-Peer,支持持久化、XA事务,类似ZeroMQ。

          (5)Kafka/Jafka 高性能跨语言的分布式发布/订阅消息系统,数据持久化,全分布式,同时支持在线和离线处理。相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

    4 kafka的设计目标

          (1)高吞吐率 在廉价的商用机器上单机可支持每秒100万条消息的读写。

          (2)消息持久化 所有消息均被持久化到磁盘,无消息丢失,支持消息重放。

          (3)完全分布式 Producer,Broker,Consumer均支持水平扩展。

          (4)同时适应在线流处理和离线批处理。

    二 kafka架构设计

       1  整体架构

            包含producer生产者,broker(kafka集群),consumer消费者,zookeeper,如图。

          *producer把数据push给broker,consumer通过pull向kafka。一般消息机制的broker使用push向consumer发送消息,kafka相反,consumer通过pull向broker拉数据。这样做两个好处 : 一是broker不用记住consumer相关信息和状态,职责单一;二是由consumer自己决定何时向broker要数据,根据consumer的消费能力来拉数据,如果broker主动发送,则受制于consumer的处理速度,导致consumer压力增大。

         * broker和consumer都依赖于zookeeper

         * producer会周期性刷新broker的元信息,发送失败的时候,也会获得元信息。

         * 集群元信息存在zookeeper里,consumer通过查询zookeeper得到集群元信息。 

       2 topic 和partition

            Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。

          topic:

           *同一个Topic的消息可分布在一个或多个节点(Broker)上

       *一个Topic包含一个或者多个Partition

           *每条消息都属于且仅属于一个Topic

       *Producer发布数据时,必须指定将该消息发布到哪一个Topic

           *Consumer订阅消息时,必须指定订阅哪个Topic的消息 

      

              partition

             *为了性能考虑,如果topic内的消息只存于一个broker,那这个broker会成为瓶颈,无法做到水平扩展。Partition的引入就是解决水平扩展问题的一个方案。每个partition可以被认为是一个无限长度的数组,新数据顺序追加进这个数组。物理上,每个partition对应于一个文件夹。一个broker上可以存放多个partition。这样,producer可以将数据发送给多个broker上的多个partition,consumer也可以并行从多个broker上的不同paritition上读数据,实现了水平扩展。

        *物理概念,一个Partition只分布于一个Broker上(不考虑备份)

            * 一个Partition物理上对应一个文件夹

            * 一个Partition包含多个Segment(Segment对用户透明)

            *一个Segment对应一个文件 

            *Segment由一个个不可变记录组成,记录只会被append到Segment中,不会被单独删除或者修改

            *清除过期日志时,直接删除一个或多个Segment

            

               *因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。

               *Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略删除旧数据。一是基于时间,二是基于Partition文件大小。

               *因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高Kafka性能无关。选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每一个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。这个offset由Consumer控制。正常情况下Consumer会在消费完一条消息后递增该offset。当然,Consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由Consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些消费过,也不需要通过broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。

        3 partition位置和producer种类

          * 我们可以手动指定消息落到哪个partition里。实现Partitioner接口的partition()方法,传入一个自定的key,返回一个int,就是partition的位置。常见的定位patition的有两种。

          (1)hash法,根据传入的key的hashcode,对partition数量取模,得到位置。

          (2)使用一个线程安全的全剧计数器(例如automiclong),每次加1并对patition数量取模,或者取随机数,均匀分配到patition上去。优点是不需要对key进行处理,让整个patition比较均匀。

         * producer分两种,同步和异步。在声明Producer时,设置key-value属性 “producer.type”为"sync"或“async”即可。还可以对producer其他各种属性进行配置,如序列化方式,压缩方式,失败重试机制,后台线程数,异步发送批量数量等等。

          同步producer低延迟,低吞吐,不会丢失数据。

          异步producer高延迟(批量发送可能会有延迟),高吞吐,可能丢失数据(队列满压力大丢弃等)。

        

  • 相关阅读:
    UIScrollView(滚动视图)
    NSJSONSerialization(json序列化)
    手势(UIGestureRecognizer)
    mac常用命令
    ios设备相关
    UITextField
    cocos2d学习笔记
    NSTimer(定时器)
    git命令
    Java 线程的终止-interrupt
  • 原文地址:https://www.cnblogs.com/lkdirk/p/8145681.html
Copyright © 2011-2022 走看看