zoukankan      html  css  js  c++  java
  • Kafka简明教程

    概述

    Kafka 是一个分布式消息队列(MQ, Message queue)中间件,支持点对点(Quene)、发布订阅(Topic)模式。Kafka 的定位主要在日志等方面,单击吞吐量特别大, 因为Kafka 设计的初衷就是处理日志的,可以看做是一个日志(消息)系统一个重要组件,针对性很强。

    使用场景:

    • 网站活动跟踪:根据不同的业务数据类型,将消息发布到不同的 Topic。
    • 日志聚合:可以将多台主机或应用的日志数据抽象成一个个日志或事件的消息流,异步发送到 Kafka 集群。
    • 流计算处理:构建应用系统和分析系统的桥梁,并将它们之间的关联解耦。
    • 数据中转枢纽:利用 Kafka 作为数据中转枢纽,同份数据可以被导入到不同专用系统中。

    官网:http://kafka.apache.org/
    中文站:http://kafka.apachecn.org/

    名称: Kafka
    所属社区/公司:Apache
    开发语言: Java
    协议: 自行设计的协议,仿AMQP
    事务:不支持
    集群:支持,依赖ZooKeeper
    

    快速入门

    官方的 quickstart 已经非常详细了,按照文档可以一步一步的达到入门的效果。地址:http://kafka.apache.org/quickstart

    这里我记录一下简单的步骤,仅作为测试使用,真实环境请参考官方文档部署:
    1、下载解压:

    $ cd /opt
    $ wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz
    $ tar -xzf kafka_2.12-2.2.0.tgz
    $ cd kafka_2.12-2.2.0
    

    Kafka 依赖 ZooKeeper 。安装包里已经包含了 ZooKeeper。

    2、启动 ZooKeeper

    $ bin/zookeeper-server-start.sh config/zookeeper.properties
    
    # 限于篇幅,省略大部分输出
    ...
    [2019-05-11 13:15:44,643] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
    

    如果需要后台运行,请在命令后面追加&

    3、启动 Kafka Server端

    $ bin/kafka-server-start.sh config/server.properties
    
    # 限于篇幅,省略大部分输出
    ...
    [2019-05-11 13:18:34,578] INFO Kafka version: 2.2.0 (org.apache.kafka.common.utils.AppInfoParser)
    [2019-05-11 13:18:34,578] INFO Kafka commitId: 05fcfde8f69b0349 (org.apache.kafka.common.utils.AppInfoParser)
    [2019-05-11 13:18:34,579] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
    

    如果需要后台运行,请在命令后面追加&

    4、创建主题(Topic)
    创建一个名为 test 的主题,包含1个分区(partition),1个副本(replication-factor):

    $ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    

    创建完毕后可以查看该主题:

    $  bin/kafka-topics.sh --list --bootstrap-server localhost:9092
    
    test
    

    也可以在配置里设置为在发布不存在的主题时自动创建主题,而不是手动创建主题。这个后面再说明。

    5、发布消息
    我们新启动一个命令行窗口充当生产者,向 Kafka 里发送消息,指定主题为 test

    $ cd /opt/kafka_2.12-2.2.0/
    $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    
    >
    

    然后命令行等待我们输入消息。我们输入 hello回车:

    >hello
    >
    

    消息就发出去了。接下来我们启动消费者。

    6、消费消息

    我们新启动一个命令行窗口充当消费者来消费消息,指定主题为 test

    $ cd /opt/kafka_2.12-2.2.0/
    $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    
    hello
    
    

    就消费了1条消息。我们可以在生产者命令行窗口继续发生消息,消费者端可以实时消费。

    好了,基本的安装测试就到这。关于设置kakfa集群请参考:http://kafka.apache.org/quickstart#quickstart_multibroker

    如何在项目里使用

    上一节仅演示了在命令行里使用,可以方便调试。对于在项目里使用,需要借助 SDK。这个页面收录了所有的客户端:https://cwiki.apache.org/confluence/display/KAFKA/Clients

    PHP

    常用的SDK:

    这里以 kafka-php 为例。

    kafka-php 使用纯粹的PHP 编写的 kafka 客户端,目前支持 0.8.x 以上版本的 Kafka。最新的kafka-php 版本是 v0.2.8 (截止到2019-05-11),详见:https://github.com/weiboad/kafka-php/releaseskafka-phpv0.2.xv0.1.x 不兼容,如果使用原有的 v0.1.x 的可以参照文档 Kafka PHP v0.1.x Document, 不过建议切换到 v0.2.x 上。

    kafka-php (v0.2.8) 环境要求:

    • PHP 版本大于 5.5
    • Kafka Server 版本大于 0.8.0
    • 消费模块 Kafka Server 版本需要大于 0.9.0

    1、发送消息,同步方式:

    require '../vendor/autoload.php';
    date_default_timezone_set('PRC');
    
    // use MonologLogger;
    // //use MonologHandlerStdoutHandler;
    // Create the logger
    // $logger = new Logger('my_logger');
    // //Now add some handlers
    // $logger->pushHandler(new StdoutHandler());
    
    $config = KafkaProducerConfig::getInstance();
    $config->setMetadataRefreshIntervalMs(10000);
    $config->setMetadataBrokerList('127.0.0.1:9192,127.0.0.1:9193');
    $config->setBrokerVersion('0.10.2.1');
    $config->setRequiredAck(1);
    $config->setIsAsyn(false);
    $config->setProduceInterval(500);
    $config->setTimeout(2000);
    
    $producer = new KafkaProducer();
    // $producer->setLogger($logger);
    
    for($i = 0; $i < 100; $i++) {
        $result = $producer->send(array(
            array(
                'topic' => 'test1',
                'value' => 'test1....message.',
                'key' => '',
            ),
        ));
        var_dump($result);
    }
    

    说明:

    1. 设置 logger 不是必选的。但是如果需要调试,建议加上。如果没有安装Monolog,也可以自己定一个 logger ,只要实现了 psr/log规范即可。
    2. MetadataBrokerList支持集群配置。使用英文逗号隔开即可。
    3. BrokerVersion版本需与安装的 kafka 版本一致。

    2、消费消息

    消费消息一般需要写脚本常驻运行。可以借助 Supervisor 工具。

    require '../vendor/autoload.php';
    date_default_timezone_set('PRC');
    
    // use MonologLogger;
    // use MonologHandlerStdoutHandler;
    // // Create the logger
    // $logger = new Logger('my_logger');
    // // Now add some handlers
    // $logger->pushHandler(new StdoutHandler());
    
    $config = KafkaConsumerConfig::getInstance();
    $config->setMetadataRefreshIntervalMs(10000);
    $config->setMetadataBrokerList('10.13.4.159:9192');
    $config->setGroupId('test'); //消费者组
    $config->setBrokerVersion('0.10.2.1');
    $config->setTopics(['test']); //主题
    //$config->setOffsetReset('earliest');
    $consumer = new KafkaConsumer();
    
    // $consumer->setLogger($logger);
    
    $consumer->start(function($topic, $part, $message) {
    	var_dump($message);
    });
    

    注意:

    1. 消费者组可以有多个,互相之间不影响。每个消费者组都可以消费到完整的一份消息。
    2. setOffsetReset的值有:earliest(从最早的开始消费)、latest(从最新的开始消费)。

    Golang

    Python

    发送消息示例:

    from kafka import KafkaProducer
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    for _ in range(100):
        producer.send('test', b'some_message_bytes')
    
    

    Kakfa 原理


    (上图为Kakfa架构图)

    一个典型的消息队列 Kafka 集群包含:

    • Producer:通过 push 模式向消息队列 Kafka Broker 发送消息,可以是网站的页面访问、服务器日志等,也可以是 CPU 和内存相关的系统资源信息;
    • Kafka Broker:消息队列 Kafka 的服务器,用于存储消息;支持水平扩展,一般 Broker 节点数量越多,集群吞吐率越高;
    • Consumer Group:通过 pull 模式从消息队列 Kafka Broker 订阅并消费消息;
    • Zookeeper:管理集群的配置、选举 leader,以及在 Consumer Group 发生变化时进行负载均衡。

    几个重要概念

    • Broker:消息队列 Kafka 集群包含一个或多个消息处理服务器,该服务器被称为 Broker。
    • Topic:主题。每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。
    • Partition:分区。一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO( First Input First Output的缩写,先入先出队列)的队列。
    • Producer: 消息发布者,也称为消息生产者,负责生产并发送消息到 Kafka Broker。
    • Consumer: 消息订阅者,也称为消息消费者,负责向 Kafka Broker 读取消息并进行消费。
    • Consumer Group:消费者组。这类 Consumer 通常接收并消费同一类消息,且消费逻辑一致。Consumer Group 和 Topic 的关系是 N:N,同一个 Consumer Group 可以订阅多个 Topic,同一个 Topic 也可以被多个 Consumer Group 订阅。
    • Replication:副本。为了保证分布式可靠性,kafka0.8开始对每个分区的数据进行备份(不同的Broker上),防止其中一个Broker宕机造成分区上的数据不可用。

    分区、组、消费者的关系

    消息队列 Kafka 采用 Pub/Sub(发布/订阅)模型,其中:

    • Consumer Group 和 Topic 的关系是 N:N。 同一个 Consumer Group 可以订阅多个 Topic,同一个 Topic 也可以同时被多个 Consumer Group 订阅。
    • 同一 Topic 的一条消息只能被同一个 Consumer Group 内的任意一个 Consumer 消费,但多个 Consumer Group 可同时消费这一消息。

    说明:
    1、同一个分区(partition)内的消息只能被同一个组中的一个消费者(consumer)消费,当消费者数量多于分区的数量时,多余的消费者空闲。
    2、启动多个组,则会使同一个消息被消费多次。

    详细请看:https://www.jianshu.com/p/6233d5341dfe

    组成结构

    生产者消费者关系:

    对于每一个topic, Kafka集群都会维持一个分区日志,如下所示:

    kafka分区是提高kafka性能的关键所在,当发现集群性能不高时,常用手段就是增加Topic的分区,分区里面的消息是按照从新到老的顺序进行组织,消费者从队列头订阅消息,生产者从队列尾添加消息。

    负载均衡

    Kafka 负载消费的内部原理是,把订阅的 Topic 的分区,平均分配给各个消费实例。因此,消费实例的个数不要大于分区的数量,否则会有实例分配不到任何分区而处于空跑状态。这个负载均衡发生的时间,除了第一次启动上线之外,后续消费实例发生重启、增加、减少等变更时,都会触发一次负载均衡。

    配置

    Kafka支持的配置非常多,这里仅仅列出来部分关于 broker 的配置。broker 配置文件是config/server.properties

    每个kafka broker中配置文件默认必须配置的属性如下:

    broker.id=0  
    port=9092
    num.network.threads=2  
    num.io.threads=8  
    socket.send.buffer.bytes=1048576  
    socket.receive.buffer.bytes=1048576  
    socket.request.max.bytes=104857600  
    log.dirs=/tmp/kafka-logs  
    num.partitions=1
    log.retention.hours=168  
    log.segment.bytes=536870912  
    log.retention.check.interval.ms=60000  
    log.cleaner.enable=false  
    zookeeper.connect=localhost:2181  
    zookeeper.connection.timeout.ms=1000000
    

    配置说明:

    参数 默认值 描述
    broker.id -1 用于服务的broker id。如果没设置,将生成一个唯一broker id。为了避免ZooKeeper生成的id和用户配置的broker id相冲突,生成的id将在reserved.broker.max.id的值基础上加1。
    port 9092 broker server服务端口。仅在未设置listeners时使用。
    host.name broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK。仅在未设置listeners时使用。
    log.dirs /tmp/kafka-logs kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能 /data/kafka-logs-1,/data/kafka-logs-2
    message.max.bytes 1000012 表示消息体的最大大小,单位是字节
    num.network.threads 3 broker处理消息的最大线程数,一般情况下数量为cpu核数
    num.io.threads 8 处理IO的线程数
    log.flush.interval.messages Long.MaxValue 在数据被写入到硬盘和消费者可用前最大累积的消息的数量
    log.flush.interval.ms Long.MaxValue 在数据被写入到硬盘前的最大时间
    log.flush.scheduler.interval.ms Long.MaxValue 检查数据是否要写入到硬盘的时间间隔。
    log.retention.hours 168 控制一个log保留多长个小时
    log.retention.bytes -1 控制log文件最大尺寸
    log.cleaner.enable false 是否log cleaning
    log.cleanup.policy delete delete还是compat.
    log.segment.bytes 1073741824 单一的log segment文件大小
    log.roll.hours 168 开始一个新的log文件片段的最大时间
    background.threads 10 后台线程序
    num.partitions 1 默认分区数
    socket.send.buffer.bytes 102400 socket SO_SNDBUFF参数
    socket.receive.buffer.bytes 102400 socket SO_RCVBUFF参数
    zookeeper.connect 指定zookeeper连接字符串, 格式如hostname:port/chroot。chroot是一个namespace
    zookeeper.connection.timeout.ms 6000 指定客户端连接zookeeper的最大超时时间
    zookeeper.session.timeout.ms 6000 连接zk的session超时时间
    zookeeper.sync.time.ms 2000 zk follower落后于zk leader的最长时间
    auto.create.topics.enable true 是否允许在服务器上自动创建topic

    更多配置查看官方文档:http://kafka.apache.org/documentation.html#configuration

    常用命令

    • 启动zookeeper
    $ bin/zookeeper-server-start.sh config/zookeeper.properties &
    
    • 关闭zookeeper
    $ bin/zookeeper-server-stop.sh
    
    • 启动kafka
    $ bin/kafka-server-start.sh config/server.properties &
    
    • 关闭kafka
    $ bin/kafka-server-stop.sh
    
    • 创建topic
    $ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    
    • 查看所有topic
    $ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
    
    • 查看某个topic具体信息
    $ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
    
    • 删除topic (可直接删除的前提:delete.topic.enable=true)
    $ bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test
    
    • 生产消息
    $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    
    • 消费消息
    $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    

    Kafka监控工具

    https://github.com/Morningstar/kafka-offset-monitor

    消息队列比较

    消息队列主要是解决了应用解耦、异常处理、流量削锋等问题。常见的消息队列还有:ActiveMQRabbitMQRocketMQZeroMQMetaMQ 等等。当然,我们也可以使用Redis作为简单的消息队列使用。

    消息队列对比参考:

    (图片来源于互联网)

    参考

    1、Apache Kafka
    http://kafka.apache.org/documentation/
    2、消息队列Kafka、RocketMQ、RabbitMQ的优劣势比较 - 知乎
    https://zhuanlan.zhihu.com/p/60288391
    3、weiboad/kafka-php: kafka php client
    https://github.com/weiboad/kafka-php
    4、kafka中partition和消费者对应关系 - 简书
    https://www.jianshu.com/p/6233d5341dfe
    5、kafka常用的命令 - 随笔 - SegmentFault 思否
    https://segmentfault.com/a/1190000010040990
    6、消息中间件部署及比较:rabbitMQ、activeMQ、zeroMQ、rocketMQ、Kafka、redis - 掘金
    https://juejin.im/post/5b32044ef265da59654c3027
    7、面试官问分布式技术面试题,一脸懵逼怎么办?_ITPUB博客
    http://blog.itpub.net/69917606/viewspace-2642545/
    8、产品架构_产品简介_消息队列 Kafka-阿里云
    https://help.aliyun.com/document_detail/68152.html?spm=a2c4g.11186623.6.543.3ba272e4cAMqaH

  • 相关阅读:
    【转载】 NumPy之:数据类型对象dtype
    在深度学习的视觉VISION领域数据预处理的魔法常数magic constant、黄金数值的复现: mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225]
    关于Numpy数据类型对象(dtype)使用详解
    【转载】 大端模式和小端模式的区别是什么?
    在使用pytorch官方给出的torchvision中的预训练模型参数时为保证收敛性要求使用原始的数据预处理方式
    【转载】 解决 sudo echo x > 时的 Permission denied错误
    Javascript高级程序设计第二版前三章基本数据等笔记
    冒号课堂§3.4:事件驱动
    理解 JavaScript 闭包
    Browser clientX scrollLeft clientLeft
  • 原文地址:https://www.cnblogs.com/52fhy/p/10848659.html
Copyright © 2011-2022 走看看