zoukankan      html  css  js  c++  java
  • 从零开始学习Kafka

    简介

    kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。

    Kafka核心组件-intsmaze

    •   Topic:消息根据Topic进行归类,可以理解为一个队里。
    •   Producer:消息生产者,就是向kafka broker发消息的客户端。
    •   Consumer:消息消费者,向kafka broker取消息的客户端。
    •   broker:每个kafka实例(server),一台kafka服务器就是一个broker,一个集群由多个broker组成,一个broker可以容纳多个topic。
    •   Zookeeper:依赖集群保存meta信息。
        
      大家先看kafka的介绍或者教程啊,上来都显示一堆长篇大论,各自文字图片,看着很懵逼,头晕。搞程序的,要让ta跑起来,再针对可运行的成果,慢慢了解ta。所以本文会由浅入深,先实践后理论,结合实践讲理论。

    Kafka安装配置

    下载

    wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz
    

    解压

     tar -zxvf kafka_2.11-2.2.0.tgz
     
    

    修改 kafka-server 的配置文件

     cd kafka_2.11-2.2.0
     
    vim  config/server.properties
    

    修改其中的:

    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=1
    # A comma separated list of directories under which to store log files
    log.dirs=/data/kafka-logs
    

    启动zk【默认端口2181】

    bin/zookeeper-server-start.sh config/zookeeper.properties
    

    image.png

    启动Kafka

    使用 kafka-server-start.sh 启动 kafka 服务:

    bin/kafka-server-start.sh config/server.properties
    

    image.png

    测试使用

    创建 topic

    使用 kafka-topics.sh 创建单分区单副本的 topic demo

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
    

    image.png

    查看 topic 列表:

    bin/kafka-topics.sh --list --zookeeper localhost:2181
    

    image.png

    发送消息【生产者】

    使用 kafka-console-producer.sh 发送消息:

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo
    

    读取消息【消费者】

    使用 kafka-console-consumer.sh 接收消息并在终端打印:

     bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning
    

    image.png
    注意不要使用
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning,高版本已经不支持

    查看描述 topics 信息

    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic demo
    
    [root@localhost kafka_2.11-2.2.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic demo
    Topic:demo      PartitionCount:1        ReplicationFactor:1     Configs:
            Topic: demo     Partition: 0    Leader: 1       Replicas: 1     Isr: 1
    
    

    image.png

    第一行给出了所有分区的摘要,每个附加行给出了关于一个分区的信息。 由于我们只有一个分区,所以只有一行。

    • “Leader”: 是负责给定分区的所有读取和写入的节点。 每个节点将成为分区随机选择部分的领导者。
    • “Replicas”: 是复制此分区日志的节点列表,无论它们是否是领导者,或者即使他们当前处于活动状态。
    • “Isr”: 是一组“同步”副本。这是复制品列表的子集,当前活着并被引导到领导者

    扩展-集群配置

    Kafka 支持两种模式的集群搭建:可以在单机上运行多个 broker 实例来实现集群,也可在多台机器上搭建集群,下面介绍下如何实现单机多 broker 实例集群,其实很简单,只需要如下配置即可。

    单机多broker 集群配置

    利用单节点部署多个 broker。 不同的 broker 设置不同的 id,监听端口及日志目录。 例如:

    cp config/server.properties config/server-2.properties
    vi config/server-2.properties
    

    修改内容:

    broker.id=2
    
    listeners = PLAINTEXT://127.0.0.1:9093
    
    log.dirs=/data/kafka-logs2
    

    同样,配置第三个broker:

    cp config/server-2.properties config/server-3.properties
    vi config/server-3.properties
    

    修改内容:

    broker.id=2
    
    listeners = PLAINTEXT://127.0.0.1:9093
    
    log.dirs=/data/kafka-logs2
    

    listeners 申明此kafka服务器需要监听的端口号,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置,例如:         
    listeners=PLAINTEXT:// 192.168.180.128:9092
    并确保服务器的9092端口能够访问

    启动2/3 borker

    bin/kafka-server-start.sh config/server-2.properties &
    bin/kafka-server-start.sh config/server-3.properties &
    
    

    至此,单机多broker实例的集群配置完毕。

    扩展-多机多borker集群

    分别在多个节点按上述方式安装 Kafka,配置启动多个 Zookeeper 实例。

    假设三台机器 IP 地址是 : 192.168.153.135, 192.168.153.136, 192.168.153.137

    分别配置多个机器上的 Kafka 服务,设置不同的 broker id,zookeeper.connect 设置如下:

    config/server.properties里面的 zookeeper.connect

    zookeeper.connect=192.168.153.135:2181,192.168.153.136:2181,192.168.153.137:2181
    

    使用 Kafka Connect 来导入/导出数据

    从控制台写入数据并将其写回控制台是一个方便的起点,但您可能想要使用其他来源的数据或将数据从 Kafka 导出到其他系统。对于许多系统,您可以使用 Kafka Connect 来导入或导出数据,而不必编写自定义集成代码。

    Kafka Connect 是 Kafka 包含的一个工具,可以将数据导入和导出到 Kafka。它是一个可扩展的工具,运行 连接器,实现与外部系统交互的自定义逻辑。在这个快速入门中,我们将看到如何使用简单的连接器运行 Kafka Connect,这些连接器将数据从文件导入到 Kafka topic,并将数据从 Kafka topic 导出到文件。

    参考:

    • http://www.54tianzhisheng.cn/2018/01/04/Kafka/ 
      
    • http://kafka.apache.org/10/documentation/streams/quickstart
      
    • http://kafka.apache.org/20/documentation.html#quickstart
      

    代码测试

    准备测试kafka

    cp config/server.properties config/server-idea.properties
    vi config/server-idea.properties
     
    broker.id=999
    
    listeners = PLAINTEXT://192.168.1.177:9999
    
    log.dirs=/data/kafka-logs-999
    

    192.168.1.177为kafka所在机器的ip地址,9999端口号是对外提供的端口,下文会使用到

    Springboot 发送消息、接受消息源码

    很简单的一个小demo,可以直接拷贝使用。

    KafkaApplication.java:

    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.core.KafkaTemplate;
    
    @SpringBootApplication
    public class KafkaApplication {
    
        public static void main(String[] args) {
    
            ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);
    
            KafkaTemplate kafkaTemplate = context.getBean(KafkaTemplate.class);
    
            for (int i = 0; i < 10; i++) {
                //调用消息发送类中的消息发送方法
                kafkaTemplate.send("mytopic", System.currentTimeMillis() + "发送" + i);
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        @KafkaListener(topics = {"mytopic"},groupId = "halburt-demo2")
        public void consumer1(String message) {
            System.out.println("consumer1收到消息:" + message);
        }
    
        @KafkaListener(topics = {"mytopic"} ,groupId = "halburt-demo")
        public void consumer2(ConsumerRecord<?, ?> record) {
            System.out.println("consumer2收到消息");
            System.out.println("    topic" + record.topic());
            System.out.println("    key:" + record.key());
            System.out.println("    value:"+record.value());
        }
    }
    

    application.yml:

    server:
      port: 8090
    spring:
      kafka:
        consumer:
          auto-commit-interval: 100
          bootstrap-servers: 192.168.1.177:9999
          enable-auto-commit: true
          group-id: halburt-demo
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          max-poll-records: 1
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        listener:
          concurrency: 5
        producer:
          bootstrap-servers: 192.168.1.177:9999
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
    

    192.168.1.177:9999即为kafka的配置文件中配置

    pom.xml依赖:

    依赖版本:

    spring-boot.version:2.1.3.RELEASE
    spring-kafka.version:2.2.0.RELEASE

    【此处有坑】此处依赖版本可以不用这2个版本,但是一定要注意springboot和kafka的版本对应

        <dependencies>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>2.2.0.RELEASE</version>
            </dependency>
        
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
                <version>2.1.3.RELEASE</version>
            </dependency>
        </dependencies>
    

    启动kafka并run Application.java

    bin/kafka-server-start.sh config/server-idea.properties &
    

    image.png
    已经启动了zk,此处不用再启动,如果未启动,需要启动zk。
    image.png

    cd /home/hd/kafka_2.11-2.2.0/
    bin/zookeeper-server-start.sh config/zookeeper.properties&
    

    kafka启动成功之后,run Application,会看到日志如下:
    image.png
    已经接收到消息了。

    如果你是跟着本文从头开始的,一定注意此处有个坑

    如果你是从头开始跟这个本文学习的,那么你直接启动的话,会发现消息发出去了,但是没有接收到。
    我也是查了好久,看了很多教程,别人都行我就不行。
    如果你的zk有其他的topic节点的话,会收不到消息,直接上解决方案:删除所有的zk节点。怎么删除?

    上码:

    
    /**
     * zookeeper znode递归删除节点
     * @author Halburt
     *
     */
    public class DeleteZkNode {
        //zookeeper的地址 
        private static final String connectString = "192.168.1.177:2181";
    
        private static final int sessionTimeout = 2000;
    
        private static ZooKeeper zookeeper = null;
    
        /**
         * main函数
         * @param args
         * @throws Exception
         */
        public static void main(String[] args) throws Exception {
    
            //调用rmr,删除所有目录
            rmr("/");
        }
    
        /**
         * 递归删除 因为zookeeper只允许删除叶子节点,如果要删除非叶子节点,只能使用递归
         * @param path
         * @throws IOException
         */
        public static void rmr(String path) throws Exception {
            ZooKeeper zk = getZookeeper();
            //获取路径下的节点
            List<String> children = zk.getChildren(path, false);
            for (String pathCd : children) {
                //获取父节点下面的子节点路径
                String newPath = "";
                //递归调用,判断是否是根节点
                if (path.equals("/")) {
                    newPath = "/" + pathCd;
                } else {
                    newPath = path + "/" + pathCd;
                }
                rmr(newPath);
            }
            //删除节点,并过滤zookeeper节点和 /节点
            if (path != null && !path.trim().startsWith("/zookeeper") && !path.trim().equals("/")) {
                zk.delete(path, -1);
                //打印删除的节点路径
                System.out.println("被删除的节点为:" + path);
            }
        }
    
        /**
         * 获取Zookeeper实例
         * @return
         * @throws IOException
         */
        public static ZooKeeper getZookeeper() throws IOException {
            zookeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
    
                }
            });
            return zookeeper;
        }
    
    }
    

    终端命令查看消息

    bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.177:9999 --topic mytopic  --from-beginning
    

    image.png

    安利一下可视化工具Kafka Tool 2

    下载地址

    Kafka Tool 2是一款Kafka的可视化客户端工具,可以非常方便的查看Topic的队列信息以及消费者信息以及kafka节点信息。下载地址:http://www.kafkatool.com/download.html

    使用

    先创建连接

    下载安装之后会弹出一个配置连接的窗口,我们可以看到这个窗口左上角为Add Cluster(添加集群),但没关系,对应单节点的Kafka实例来说也是可以的,因为这个软件监控的是Zookeeper而不是Kafka,Kafka的集群搭建也是依赖Zookeeper来实现的,所以默认情况下我们都是直接通过Zookeeper去完成大部分操作。
    image.png

    创建完成之后,连接

    我们可以看到已经创建好的Topic。这个软件默认显示数据的类型为Byte,可以在设置里面找到对应的修改选项
    image.png
    接下来就自己探索吧
    image.,接下来就自己探索吧png

    理论学习

    kafka单节点的结构如下:

    image.png
    单节点broker包含多个topic主题,而每个topic则包含多个partition副本,每个partition会有序的存储消息。

    kafka的总体数据流

    kafka对外使用topic的概念,生产者往topic里写消息,消费者从topic读消息。为了做到水平扩展,一个topic实际是由多个partition组成的,遇到瓶颈时,可以通过增加partition的数量来进行横向扩容。单个parition内是保证消息有序。每新写一条消息,kafka就是在对应的文件append写,所以性能非常高。kafka的总体数据流是这样的:
    2835676-f378607bc841309a.png

    Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉去指定Topic的消息,然后进行业务处理。

    名词解析

    Producer

    消费者: Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息归属于哪个partition;比如基于"round-robin"方式或者通过其他的一些算法等.

    Consumer

    每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer.发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费(对于一条消息来说,同一组的消费者只会有一个消费者去消费).

    如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.
     如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者.

    在kafka中,一个partition中的消息只会被group中的一个consumer消费;每个group中consumer消息消费互相独立;我们可以认为一个group是一个"订阅"者,一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以消费多个partitions中的消息.kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的。事实上,从Topic角度来说,消息仍不是有序的。

    Topics

    一个Topic可以认为是一类消息,每个topic将被分成多个partition(区),每个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它是唯一标记一条消息。它唯一的标记一条消息。kafka并没有提供其他额外的索引机制来存储offset,因为在kafka中几乎不允许对消息进行“随机读写”。

    Partition

    topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列

    以下是单个生产者和消费者从两个分区主题读取和写入的简单示例。
    image.png

    此图显示了一个producer向2个partition分区写入日志,以及消费者从相同日志中读取的内容。日志中的每条记录都有一个相关的条目号,称之为偏移量offset。消费者使用此偏移来记录其在partitiond读取日志的位置。

    当然如果存在多个消费者的话,根据groupId分组,同一组的消费者不会重复读取日志。

    换句话说:
    订阅topic是以一个消费组来订阅的,一个消费组里面可以有多个消费者。同一个消费组中的两个消费者,不会同时消费一个partition。换句话来说,就是一个partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费。因此,如果消费组内的消费者如果比partition多的话,那么就会有个别消费者一直空闲。
     
    

    其实consumer可以使用任意顺序消费日志消息,它只需要将offset重置为任意值.(offset将会保存在zookeeper中,kafka集群几乎不需要维护任何consumer和producer状态信息,这些信息有zookeeper保存)

    partition有多个.最根本原因是kafka基于文件存储.通过分区,可以将日志内容分散到多个partition上,来避免文件大小达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存;可以将一个topic切分多任意多个partitions,来消息保存/消费的效率.此外越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力.

    使用场景

    消息系统、消息队列

    对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势.不过到目前为止,我们应该很清楚认识到,kafka并没有提供JMS中的"事务性""消息传输担保(消息确认机制)""消息分组"等企业级特性;kafka只能使用作为"常规"的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)

    日志聚合

    kafka的特性决定它非常适合作为"日志收集中心";application可以将操作日志"批量""异步"的发送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支.此时consumer端可以使hadoop等其他系统化的存储和分析系统.

    网站活动追踪、调用链系统、事件采集

    可以将网页/用户操作等信息发送到kafka中.并实时监控,或者离线统计分析等

    等等其他场景

    server.properties配置文件解读

    ############################# Server Basics #############################
    # 节点的ID,必须与其它节点不同
    broker.id=0
    # 选择启用删除主题功能,默认false
    #delete.topic.enable=true
    ############################# Socket Server Settings #############################
    
    # 套接字服务器坚挺的地址。如果没有配置,就使用java.net.InetAddress.getCanonicalHostName()的返回值
    # FORMAT:
    # listeners = listener_name://host_name:port
    # EXAMPLE:
    # listeners = PLAINTEXT://your.host.name:9092
    #listeners=PLAINTEXT://:9092
    
    # 节点的主机名会通知给生产者和消费者。如果没有设置,如果配置了"listeners"就使用"listeners"的值。
    # 否则就使用java.net.InetAddress.getCanonicalHostName()的返回值
    #advertised.listeners=PLAINTEXT://your.host.name:9092
    
    # 将侦听器的名称映射到安全协议,默认情况下它们是相同的。有关详细信息,请参阅配置文档
    #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    
    # 服务器用来接受请求或者发送响应的线程数
    num.network.threads=3
    
    # 服务器用来处理请求的线程数,可能包括磁盘IO
    num.io.threads=8
    
    # 套接字服务器使用的发送缓冲区大小
    socket.send.buffer.bytes=102400
    
    # 套接字服务器使用的接收缓冲区大小
    socket.receive.buffer.bytes=102400
    
    # 单个请求最大能接收的数据量
    socket.request.max.bytes=104857600
    
    
    ############################# Log Basics #############################
    
    # 一个逗号分隔的目录列表,用来存储日志文件
    log.dirs=/tmp/kafka-logs
    
    # 每个主题的日志分区的默认数量。更多的分区允许更大的并行操作,但是它会导致节点产生更多的文件
    num.partitions=1
    
    # 每个数据目录中的线程数,用于在启动时日志恢复,并在关闭时刷新。
    num.recovery.threads.per.data.dir=1
    
    ############################# Internal Topic Settings #############################
    # 内部主题设置
    # 对于除了开发测试之外的其他任何东西,group元数据内部主题的复制因子“__consumer_offsets”和“__transaction_state”,建议值大于1,以确保可用性(如3)。
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    
    ############################# Log Flush Policy #############################
    
    
    
    # 在强制刷新数据到磁盘之前允许接收消息的数量
    #log.flush.interval.messages=10000
    
    # 在强制刷新之前,消息可以在日志中停留的最长时间
    #log.flush.interval.ms=1000
    
    ############################# Log Retention Policy #############################
    
    # 以下的配置控制了日志段的处理。策略可以配置为每隔一段时间删除片段或者到达一定大小之后。
    # 当满足这些条件时,将会删除一个片段。删除总是发生在日志的末尾。
    
    # 一个日志的最小存活时间,可以被删除
    log.retention.hours=168
    
    # 一个基于大小的日志保留策略。段将被从日志中删除只要剩下的部分段不低于log.retention.bytes。
    #log.retention.bytes=1073741824
    
    # 每一个日志段大小的最大值。当到达这个大小时,会生成一个新的片段。
    log.segment.bytes=1073741824
    
    # 检查日志段的时间间隔,看是否可以根据保留策略删除它们
    log.retention.check.interval.ms=300000
    
    ############################# Zookeeper #############################
    
    zookeeper.connect=localhost:2181
    
    # 连接到Zookeeper的超时时间
    zookeeper.connection.timeout.ms=6000
    
    
    ############################# Group Coordinator Settings #############################
    
    group.initial.rebalance.delay.ms=0
    

    参考文章

    https://www.cnblogs.com/likehua/p/3999538.html

    https://www.jianshu.com/p/d3e963ff8b70

    如有表述不当之处,敬请指正。

  • 相关阅读:
    UIViewController生命周期
    NSTImer重复执行任务
    IOS平台汉字转拼音方案
    @properties指针说明
    自定义yum仓库
    man手册、zip备份
    ln 软连接与硬连接
    fdisk分区规划和添加wap交换空间
    window部署ftp服务器
    配置附加权限和LDAP
  • 原文地址:https://www.cnblogs.com/Halburt/p/10842597.html
Copyright © 2011-2022 走看看