zoukankan      html  css  js  c++  java
  • kafka简介&kafka安装

    1. 简介

      Kafka 是一个基于发布/订阅模式的消息队列,主要用于大数据实时处理领域。

    1. 使用消息队列的好处:

    1. 解耦:允许独立的修改或者扩展两遍的处理过程

    2. 可恢复性:系统的一部分组件失效后,不会影响整个系统。

    3. 缓冲:有助于控制和优化数据流经过系统的速度,解决生产者和消费者处理消息速度不一致的问题

    4.灵活性&峰值处理:使用消息队列处理一下超过限流的任务

    2. 消息队列的两种模式

    1. 点对点模式:一对一,消费者主动拉取消息,消息收到后消息清除

    2.发布/订阅模式:一对多,消费者消费消息后消息不会删除(保留是有期限的),也就是一条消息可以被多个消费者消费。

    3. kafka架构

    1》producer:消息生产者

    2》consumer:消息消费者

    3》consumer group:消费者组,由多个consumer组成。 消费者组内每个消费者负责消费不同分区的消息,一个分区只能由一个组内消费者消费;消费者之间互相不影响。所有的消费者都属于一个消费者组,组是一个逻辑上的一个订阅者。

    4》broker:一台服务器就是一个broker。一个集群由多个broker组成。一个broker 有多个topic

    5》topic:可以理解为一个主题。生产者和消费者面向的都是topic

    6》partition:为了实现扩展性,一个大的topic可以分布到多个broker上,一个topic 可以分布到不同的partition,每个partition 是一个有序的队列。

    7》replication:副本,每个partition 都有若干个replication,一个leader 和 多个follower

    8》leader:每个分区多个副本的主,生产者生产的消息以及消费者消费的消息面向的都是leader

    9》follower:每个分区部门的从节点,实时的和主进行同步,负责备份主节点的数据。leader 发生故障时,某个follower会成为新的leader。

    2. kafka 安装

      下面基于docker 安装。

    1. 安装zk

    docker pull hub.c.163.com/cloudpri/zookeeper:latest

    测试:

    docker run -d --name zookeeper -p 2181:2181 -t hub.c.163.com/cloudpri/zookeeper

    进入到容器执行如下命令:

    cd /apache-zookeeper-3.5.5-bin/bin
    zkCli.sh
    ls /    查看目前存在的节点

    2. 安装kafka

    参考: https://github.com/wurstmeister/kafka-docker  

    这个是使用docker-compose up -d 进行设置

    1. 获取kafka 镜像

    docker pull hub.c.163.com/qingzhou/wurstmeister/kafka:v1.0

    2. 编写docker-compose.yml

    内容如下:

    version: '2'
    services:
      zookeeper:
        image: hub.c.163.com/cloudpri/zookeeper
        ports:
          - "2181:2181"
      kafka:
        image: hub.c.163.com/qingzhou/wurstmeister/kafka:v1.0
        ports:
          - "9092"
        environment:
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
          KAFKA_LISTENERS: PLAINTEXT://:9092
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock

    3. 测试kafka 进入docker-compose.yml 所在的目录执行如下

    $ docker-compose up -d
    Creating dockerkafka_zookeeper_1 ... done
    Creating dockerkafka_kafka_1     ... done

    4. 查看启动了两个进程

    $ docker ps -a
    CONTAINER ID        IMAGE                                            COMMAND                  CREATED             STATUS              PORTS                                                  NAMES
    ee89c3731105        hub.c.163.com/qingzhou/wurstmeister/kafka:v1.0   "start-kafka.sh"         2 minutes ago       Up 2 minutes        0.0.0.0:32772->9092/tcp                                dockerkafka_kafka_1
    d7d2b0d3c93d        hub.c.163.com/cloudpri/zookeeper                 "/docker-entrypoint.…"   2 minutes ago       Up 2 minutes        2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   dockerkafka_zookeeper_1

    5. 查看版本号

    $ docker exec dockerkafka_kafka_1 find / -name *kafka_* | head -1 | grep -o 'kafka[^
    ]*'
    kafka_2.12-2.3.0

    可以看到相关版本

    6. 查看zookeeper 版本

    $ docker exec dockerkafka_zookeeper_1 pwd
    /apache-zookeeper-3.5.5-bin

    7. 扩展broker:

    (1) 在docker-compose.yml所在的文件夹下,执行以下命令即可将borker总数从1个扩展到4个:

    $ docker-compose scale kafka=4
    WARNING: The scale command is deprecated. Use the up command with the --scale flag instead.
    Starting dockerkafka_kafka_1 ... done
    Creating dockerkafka_kafka_2 ... done
    Creating dockerkafka_kafka_3 ... done
    Creating dockerkafka_kafka_4 ... done

    (2)  查看

    $ docker ps -a
    CONTAINER ID        IMAGE                                            COMMAND                  CREATED              STATUS              PORTS                                                  NAMES
    449364fdf044        hub.c.163.com/qingzhou/wurstmeister/kafka:v1.0   "start-kafka.sh"         About a minute ago   Up About a minute   0.0.0.0:32775->9092/tcp                                dockerkafka_kafka_3
    4965080e898d        hub.c.163.com/qingzhou/wurstmeister/kafka:v1.0   "start-kafka.sh"         About a minute ago   Up About a minute   0.0.0.0:32774->9092/tcp                                dockerkafka_kafka_4
    2ddebcf2aba8        hub.c.163.com/qingzhou/wurstmeister/kafka:v1.0   "start-kafka.sh"         About a minute ago   Up About a minute   0.0.0.0:32773->9092/tcp                                dockerkafka_kafka_2
    ee89c3731105        hub.c.163.com/qingzhou/wurstmeister/kafka:v1.0   "start-kafka.sh"         30 minutes ago       Up 30 minutes       0.0.0.0:32772->9092/tcp                                dockerkafka_kafka_1
    d7d2b0d3c93d        hub.c.163.com/cloudpri/zookeeper                 "/docker-entrypoint.…"   30 minutes ago       Up 30 minutes       2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   dockerkafka_zookeeper_1

    8. 使用kafka

    需要进入一个容器

    1》创建topic:

    创建一个topic,名为 test1,4个partition,副本因子2,执行以下命令即可:

    kafka-topics.sh --create --topic test1 --partitions 4 --zookeeper zookeeper:2181 --replication-factor 2

    2》查看topic

    bash-4.4# kafka-topics.sh --list --zookeeper zookeeper:2181 topic001
    test1

    3》查看刚刚创建的topic的情况,borker和副本情况

    bash-4.4# kafka-topics.sh --describe --zookeeper zookeeper:2181 topic001
    Topic:test1     PartitionCount:4        ReplicationFactor:2     Configs:
            Topic: test1    Partition: 0    Leader: 1004    Replicas: 1004,1002     Isr: 1004,1002
            Topic: test1    Partition: 1    Leader: 1001    Replicas: 1001,1003     Isr: 1001,1003
            Topic: test1    Partition: 2    Leader: 1002    Replicas: 1002,1004     Isr: 1002,1004
            Topic: test1    Partition: 3    Leader: 1003    Replicas: 1003,1001     Isr: 1003,1001

    9. 测试生产者和消费者

    (1) 启动消费者

    kafka-console-consumer.sh --topic topic001 --bootstrap-server dockerkafka_kafka_1:9092,dockerkafka_kafka_2:9092,dockerkafka_kafka_3:9092,dockerkafka_kafka_4:9092

    启动后控制台不会打印消息,因为没有生产者消费消息。

    (2) 启动生产者并且发送消息

    kafka-console-producer.sh --topic topic001 --broker-list dockerkafka_kafka_1:9092,dockerkafka_kafka_2:9092,dockerkafka_kafka_3:9092,dockerkafka_kafka_4:9092

    现在已经进入了生产消息的命令行模式,输入一些字符串然后回车,再去消费消息的控制台窗口看看,已经有消息打印出来,说明消息的生产和消费都成功了。

    10. 到zk 中查看节点信息,如下:

    【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
  • 相关阅读:
    命令拷屏之网络工具
    PHP 设计模式 笔记与总结(1)命名空间 与 类的自动载入
    Java实现 计蒜客 1251 仙岛求药
    Java实现 计蒜客 1251 仙岛求药
    Java实现 计蒜客 1251 仙岛求药
    Java实现 蓝桥杯 算法训练 字符串合并
    Java实现 蓝桥杯 算法训练 字符串合并
    Java实现 蓝桥杯 算法训练 字符串合并
    Java实现 LeetCode 143 重排链表
    Java实现 LeetCode 143 重排链表
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/14999337.html
Copyright © 2011-2022 走看看