zoukankan      html  css  js  c++  java
  • Kafka基本知识入门(一)

    1、 基础知识

    有关RabbitMQ,RocketMQ,Kafka的区别这个网上很多,了解一下区别性能,分清什么场景使用。分布式环境下的消息中间件Kafka做的比较不错,在分布式环境下使用频繁,我也不免其俗钻研一下Kafka的使用。

    任何消息队列都遵循AMQP协议,AMQP协议(Advanced Message Queuing Protocol,高级消息队列协议)
    AMQP是一个标准开放的应用层的消息中间件(Message Oriented Middleware)协议。AMQP定义了通过网络发送的字节流的数据格式。因此兼容性非常好,任何实现AMQP协议的程序都可以和与AMQP协议兼容的其他程序交互,可以很容易做到跨语言,跨平台。
    Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具有自己独特的设计。

    我们先看一些基本的概念:

    1. 消费者:(Consumer):从消息队列中请求消息的客户端应用程序
    2. 生产者:(Producer) :向broker发布消息的应用程序
    3. AMQP服务端(broker):用来接收生产者发送的消息并将这些消息路由给服务器中的队列,便于fafka将生产者发送的消息,动态的添加到磁盘并给每一条消息一个偏移量,所以对于Kafka一个broker就是一个应用程序的实例
    4. 主题(Topic):一个主题类似新闻中的体育、娱乐、教育等分类概念,在实际工程中通常一个业务一个主题。
    5. 分区(Partition):一个Topic中的消息数据按照多个分区组织,分区是Kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO( First Input First Output的缩写,先入先出队列)的队列。

    Kafka将消息以topic为单位进行归纳,每个broker其实就是一个应用服务器,一个broker中会有很多的topic,每个topic其实就是不同的服务需要消息的消息的聚集地。因为每个topic其实会很大,所以就出现了partition个概念,将每个topic的消息分区存储。

    这里写图片描述

    Kafka中的消费者有一个分组的概念,每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer.发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费(而不是该group下的所有consumer,一定要注意这点)

    • 如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.
    • 如果所有的consumer都具有不同的group,那这就是”发布-订阅”;消息将会广播给所有的消费者.

    在Kafka中,一个partition中的消息只会被group中的一个consumer消费;每个group中consumer消息消费互相独立;我们可以认为一个group是一个”订阅”者,一个Topic中的每个partions,只会被一个”订阅者”中的一个consumer消费,不过一个consumer可以消费多个partitions中的消息.

    分布式环境中,Kafka默认使用zookeeper作为注册中心,Kafka集群几乎不维护任何consumer和producer的信息状态,这些信息都由zookeeper保存,所以consumer和producer非常的轻量级,随时注册和离开都不会对Kafka造成震荡。

    producer和consumer通过zookeeper去发现topic,并且通过zookeeper来协调生产和消费的过程。
    producer、consumer和broker均采用TCP连接,通信基于NIO实现。Producer和consumer能自动检测broker的增加和减少。

    上面图中没有说明partition的组成,partition物理上由多个segment组成,每一个segment 数据文件都有一个索引文件对应。每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.

    相比传统的消息系统,Kafka可以很好的保证有序性。

    传统的队列在服务器上保存有序的消息,如果多个consumers同时从这个服务器消费消息,服务器就会以消息存储的顺序向consumer分发消息。虽然服务器按顺序发布消息,但是消息是被异步的分发到各consumer上,所以当消息到达时可能已经失去了原来的顺序,这意味着并发消费将导致顺序错乱。为了避免故障,这样的消息系统通常使用“专用consumer”的概念,其实就是只允许一个消费者消费消息,当然这就意味着失去了并发性。

    在这方面Kafka做的更好,通过分区的概念,Kafka可以在多个consumer组并发的情况下提供较好的有序性和负载均衡。将每个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就可以顺序的消费这个分区的消息。因为有多个分区,依然可以在多个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就允许多少并发消费。

    Kafka只能保证一个分区之内消息的有序性,在不同的分区之间是不可以的,这已经可以满足大部分应用的需求。如果需要topic中所有消息的有序性,那就只能让这个topic只有一个分区,当然也就只有一个consumer组消费它。

    1.1、 message 被分配到 partition 的过程

    每一条消息被发送到broker时,会根据paritition规则(有两种基本的策略,一是采用Key Hash算法,一是采用Round Robin算法)选择被存储到哪一个partition。如果partition规则设置的合理,所有消息可以均匀分布到不同的partition里,这样就实现了水平扩展。(如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题)。

    在发送一条消息时,可以指定这条消息的key,producer根据这个key和partition机制来判断将这条消息发送到哪个parition。paritition机制可以通过指定producer的paritition.class这一参数来指定,该class必须实现Kafka.producer.Partitioner接口。

    1.2、 segment文件存储结构

    segment file由2大部分组成,分别为index file和data file,这两个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件。

    segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。

    文件类似于下面这种形式:

    0000000000000000001.index
    0000000000000000001.log
    0000000000000036581.index
    0000000000000036581.log
    0000000000000061905.index
    0000000000000061905.log
    

    index和data-file的对应关系如下:

    这里写图片描述

    index file 存储索引文件,文件中的元数据指向对应数据文件中message的物理偏移地址。

    2、 Kafka单机环境搭建

    下载Kafka,解压缩

    配置环境变量:

    export Kafka_HOME=/usr/local/Kafka                                                                                                                                                                        
    export PATH=$PATH:$Kafka_HOME/bin 
    
    重启生效
    source /etc/profile
    

    Kafka用到了zeekeeper,所以需要先启动zookeeper,没有安装的需要先安装zk,安装好了以后我们可以启动,我们先来实现单机版的Kafka,先启动一个单单例的zk服务,可以在命令的结尾加个&符号,这样就可以启动后离开控制台。

    # bin/zookeeper-server-start.sh config/zookeeper.properties &
    

    再启动Kafka:

    # bin/Kafka-server-start.sh config/server.properties
    

    创建topic:

    # bin/Kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    

    创建producer,可以在控制台手动输入消息:

    # bin/Kafka-console-producer.sh --broker-list localhost:9092 --topic test 
    
    this is a message
    

    ctrl+c 可以退出发送。

    创建consumer:

    # bin/Kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
    
    this is a message
    会收到刚才的发送的消息
    

    我们的一个简单的单机环境就搭建好了。

  • 相关阅读:
    [AU3]技巧
    [AU3]Windows 10 Update message DETELEDER Win10更新通知删除助手
    vue多选框选择后显示选中的内容
    计算object的长度
    关于苹果手机点击事件无效的解决办法
    vue中将汉字按照首字母排序,也适用于其他地方,但不适用多音字
    关于json数据格式错误
    Java下使用Swing来进行图形界面开发
    数字与静态
    Java中的内存机制及管理
  • 原文地址:https://www.cnblogs.com/rickiyang/p/11074196.html
Copyright © 2011-2022 走看看