zoukankan      html  css  js  c++  java
  • [db]消息队列-kafka

    kakfa视频
    5分钟带你体验一把 Kafka

    IDEA相关插件推荐
    Zoolytic-Zookeeper tool
    Kafkalytic
    

    消息队列两种模式

    • 一对一

      只能有一个消费者, 消费完后, 消息即被删除.

    • 一对多(发布订阅模式)

    两种模式:
    模式1: 消费者订阅公众号, 消息队列推送消息.(缺点: 消费者因配置不同,可能被打挂)
    模式2: 消费者主动从mq里pull消息.(缺点: 消费者与mq建立长轮询, 浪费资源) -- kafka使用这种模式.

    kafka是被动的. 生产者push给我消息, 消费者主动来pull消息.

    kafka架构

    kakfa是一个
        分布式          - 可组建集群
        基于发布订阅的    - 生产者/消费者 
        message queue
    
    • producer: 生产者去连kakfa,push消息
    • broker: kafka是分布式的, 集群中每个物理节点都是一个broker.

    • topic: 对消息的归类. 如商品信息, 订单信息 分别属于两个topic

    • partition:

    如1个topic被分为10个partition.  
    则每个broker上都有10个partition存在.   
    每个broker上, 针对这个topic的众多partition中有一个leader partition. 其他均为slave,在其他broker眼中,均做备份用(高可用),
    实际的读写都找leader分区.
    
    partition num == broker num 才具备冗余能力.
    

    • consumer group:
    1个消费者组包含多个消费者
    
    1个partition同时只能被1个消费者组里的某个消费者消费.
    
    消费者组里消费者数量<=partition num
    

    • logstash消费
    input {
      kafka {
        bootstrap_servers => "localhost:9092"
        topics => "test-topic"
        group_id => "logstash-group"
        codec => "json"
        consumer_threads => 1
        decorate_events => true
      }
    }
    
    output {
        stdout { codec => rubydebug }
    }
    
    

    /usr/local/logstash/bin/logstash -f logstash.yaml --config.reload.automatic

    例子: 一个group,3个p
    先创建3个p的test-topic3
    1.当有3个p, 1个消费者时

    2.当有3个p,2个消费者时

    3.当有3个p,3个消费者时(消费能力最强)

    都是动态调配的(新增一个消费者, p的分配会自动变)

    4.当有1个p,2个消费者

    小结: 同一个消费组, 消费者个数<=p个数

    1. 2个group,1个p

    • zk & offset
    
    zk: kafka启动依赖zk, 每个kafka节点配置文件里指定zk地址, 类似服务注册.
        kafka0.9版前offset存在zk中. 消费者每次读写都访问zk存取offset, 由于访问zk频率太高, 0.9之后的kafka版本都将offset存放在自己的topic中.
    

    offset: 记录消费者消费到了队列的位置, 以免消费者挂掉后重新来消费时候接着消费
    

    每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。
    分区中的消息都被分了一个序列号,称之为偏移量(offset),在每个partition中此偏移量都是唯一的。

    安装kafka

    Scala 2.13  - kafka_2.13-2.4.0.tgz (asc, sha512) //kafka scala实现, 前面是scala的版本, 后面2.4是kakfa的版本.
    
    

    安装java(zk依赖java)

    #解压
    cd /usr/local/src/
    tar xf jdk-8u162-linux-x64.tar.gz -C /usr/local/
    ln -s /usr/local/jdk1.8.0_162 /usr/local/jdk
    
    #添加环境变量
    sed -i.ori '$a export JAVA_HOME=/usr/local/jdk
    export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
    export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$JAVA_HOME/lib/tools.jar' /etc/profile
    
    source /etc/profile
    java -version
    

    安装zk

    安装kafka

    #############################################################
    # kafka默认: 9092
    egrep -v '^$|^#' /data/home/user00/kafka/config/server.properties
    
    tail -f /data/home/user00/kafka/logs/server.log
    #############################################################
    - zk启动停止
    cd /data/home/user00/kafka/bin
    ./zookeeper-server-stop.sh
    
    cd /data/home/user00/kafka/bin
    ./zookeeper-server-start.sh ../config/zookeeper.properties &
    
    
    - kafka启动停止
    cd /data/home/user00/kafka/bin
    ./kafka-server-start.sh ../config/server.properties &
    
    cd /data/home/user00/kafka/bin
    ./kafka-server-stop.sh
    

    kafka配置文件: server.properties

    broker.id=0
    num.network.threads=2
    num.io.threads=8
    socket.send.buffer.bytes=1048576
    socket.receive.buffer.bytes=1048576
    socket.request.max.bytes=104857600
    log.dirs=/tmp/kafka-logs
    num.partitions=2          //partition数量
    log.retention.hours=168   //默认保留168h
     
    log.segment.bytes=536870912
    log.retention.check.interval.ms=60000
    log.cleaner.enable=false
     
    zookeeper.connect=localhost:2181  //连接zk
    

    docker安装kafka

    docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
    
    
    docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.1.100:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.100:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
    
    
    KAFKA_BROKER_ID=0
    KAFKA_ZOOKEEPER_CONNECT=192.168.1.100:2181
    KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.100:9092
    KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
    
    # 测试kafka
    docker exec -it kafka /bin/bash
    
    cd opt/kafka_2.11-2.0.0/
    

    dockerfile安装kakfa集群

    测试kakfa集群

    kafka增删改查

    //启动命令行测试 消费者和生产者
    - 查看基于zk的消费组
    bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test-topic3
    
    - 查看group详情(判断cusumer是否正常)
    bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group logstash-group --describe
    
    - 创建topic
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic1
    - 生产者
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
    
    - 查看基于zk的消费组
    bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test-topic3
    
    - 查看group详情(判断cusumer是否正常)
    bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group logstash-group --describe
    
    // 创建topic
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic
    
    // 开启命令行-生产者
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
    hi
    mao
    
    //开启命令行-消费者
    bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic
    hi
    mao
    
    // 创建topic
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test-topic2
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test-topic3
    
    // 查看topic list
    bin/kafka-topics.sh --zookeeper localhost:2181 --list
    test-topic
    test-topic2
    test-topic3
    
    // 查看topic 详细
    /opt/kafka_2.12-2.4.0 # bin/kafka-topics.sh --zookeeper localhost:2181 --topic test-topic3 --describe
    Topic: test-topic3	PartitionCount: 3	ReplicationFactor: 1	Configs:
    	Topic: test-topic3	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
    	Topic: test-topic3	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
    	Topic: test-topic3	Partition: 2	Leader: 0	Replicas: 0	Isr: 0
    
    // 查看消费组list(如果消费时没指定消费组名称,则默认随机产生一个名称)
    bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
    console-consumer-94154
    
    // 查看consumer group详情(判断cusumer是否正常):  LAG(langency): 表示该队列消费延迟情况(还有多少条堆积)
    bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group logstash-group --describe
    GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                            HOST            CLIENT-ID
    console-consumer-94154 test-topic      0          -               2               -               consumer-console-consumer-94154-1-c4ac900c-f053-4f2f-949c-c86d16449e86 /172.17.0.1     consumer-console-consumer-94154-1
    
    // 开启一个命令行消费者: 查看实时消费日志(判断是队列否有日志)
    bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic3
    
    // 生产者
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic3
    hi
    
    基于zk的消费者
        - 查看基于zk的消费组
            bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test-topic3
        - 开启一个消费者(随机生成group)
            bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test-topic3
        - 查看group详情(判断cusumer是否正常)
            bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group logstash-group --describe
    
    // 基于kafka的消费者
    - 查看基于kafka的消费组
        bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
    
    - 查看group详情(判断cusumer是否正常)
        bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group logstash-group --describe
    
    - 开启一个消费者(随机生成group)
        bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic3 --group logstash-group
    
    - 开启一个消费者(指定group,可能偷走已有的消费者的数据)
        bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic3 --group logstash-group
    

    kafka常用命令

    #############################################################
    ## kafka
    cd /data/home/user00/kafka
    bin/kafka-topics.sh --create --zookeeper localhost:2000 --replication-factor 2 --partitions 36 --topic test-topics
    
    - 创建topics
    cd /data/home/user00/kafka
    bin/kafka-topics.sh --create --zookeeper localhost:2000 --replication-factor 2 --partitions 36 --topic 10445
    
    - 删除topics
    cd /data/home/user00/kafka
    bin/kafka-topics.sh --delete --zookeeper localhost:2000 --topic test-topic
    
    - 查看topics
    cd /data/home/user00/kafka
    bin/kafka-topics.sh --zookeeper localhost:2000 --list
    
    - 查看topics详情
    cd /data/home/user00/kafka
    bin/kafka-topics.sh --zookeeper localhost:2000 --topic test-topic --describe
    
    - 查看group list
    cd /data/home/user00/kafka
    bin/kafka-consumer-groups.sh --zookeeper localhost:2000 --list
    
    - 查看group list详情
    cd /data/home/user00/kafka
    bin/kafka-consumer-groups.sh --zookeeper localhost:2000 --group logstash --describe
    
    
    - 查看消费情况
    cd /data/home/user00/kafka
    bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2000 --group logstash --topic 100100
    
    - 查看实时日志
    cd /data/home/user00/kafka
    bin/kafka-console-consumer.sh --zookeeper  localhost:2000 --topic 100100
    
    #############################################################
    - Create a topic
    bin/kafka-topics.sh --create --zookeeper localhost:2000 --replication-factor 1 --partitions 1 --topic test-topic
    bin/kafka-topics.sh --zookeeper localhost:2000 --list
    
    - Send some messages()
    bin/kafka-console-producer.sh --broker-list localhost:2000 --topic test-topic
    
    - Start a consumer
    bin/kafka-console-consumer.sh --bootstrap-server localhost:2000 --topic test-topic --from-beginning
    bin/kafka-console-consumer.sh --bootstrap-server localhost:2000 --topic mytopic-from-logstash --from-beginning
    #############################################################
    

    监控kafka

    kafka-manager(点集群,点topic就可以看到broker和消费lag)

    promethus+granfana(可以看到消费延迟lag)

    老版本新版本

    生产 消费 连broker即可. 连接zookeeper方式会被废弃.

  • 相关阅读:
    1. 规范说明
    Swing Jtable 添加checkbox列
    ubuntu 右键添加打开终端
    关闭ubuntu中的”System Program Problem Detected”提示
    activemq和mqtt消息总线
    Swing 自定义日期控件
    Swing Jtable 设置背景色
    Swing Jtable 网格线设置
    Java Swing Jtable 单元格不可编辑
    Java Swing JTable 设置隔行变色
  • 原文地址:https://www.cnblogs.com/iiiiher/p/9270116.html
Copyright © 2011-2022 走看看