zoukankan      html  css  js  c++  java
  • kafka学习

    Kafka概述

    Kafka是分布式(点对点模式)(发布-订阅模式)消息系统,Scala写成, 它主要用于处理流式数据。本质是基于消息队列缓存数据.

    Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。组件依赖于zookeeper集群保存一些meta信息

    为什么要用kafka,为什么要用消息队列

    1)解耦:
    2)冗余:
    3)扩展性:
    4)灵活性 & 峰值处理能力:
    5)可恢复性:
    6)顺序保证:
    7)缓冲:
    8)异步通信:

    kafka构架

    1)Producer :消息生产者,就是向kafka broker发消息的客户端;
    2)Consumer :消息消费者,向kafka broker取消息的客户端;
    3)Topic :可以理解为一个队列(就是同一个业务的数据放在一个topic下);
    4) Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。
    一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。
    如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;
    5)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic; 6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。
    partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序;
    7)Offset:偏移量。

    kafka分区

    消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成
    原因: (1) 方便在集群中扩展 (2)提高并发 因为可以以partition来进行读写

    分区有多副本机制 可以通过选举leader来提高容

    kafka中的broker 是干什么的

    broker 是消息的代理,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉取指定Topic的消息,然后进行业务处理,broker在中间起到一个代理保存消息的中转站。

    为什么Kafka不支持读写分离?

    (1)数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。
    某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,
    应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。 (
    2)延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经 历网络→主节点内存→网络→从节点内存这几个阶段,
    整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络→主节点内存→主节点磁盘→网络→从节
    点内存→从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用

    kafka中consumer group 是什么概念

    是Kafka实现单播和广播两种消息模型的手段。同一个topic的数据,会广播给不同的group;
    同一个group中的worker,只有一个worker能拿到这个数据。换句话说,对于同一个topic,每个group都可以拿到同样的所有数据,但是数据进入group后只能被其中的一个worker消费。
    group内的worker可以使用多线程或多进程来实现,也可以将进程分散在多台机器上,worker的数量通常不超过partition的数量,且二者最好保持整数倍关系,
    因为Kafka在设计时假定了一个partition只能被一个worker消费(同一group内)。

    kafka命令行操作

    修改配置文件
    #broker的全局唯一编号,不能重复
    broker.id=1
    #处理网络请求的线程数量
    num.network.threads=3
    #用来处理磁盘IO的线程数量
    num.io.threads=8
    #发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    #接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    #请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    #kafka运行日志存放的路径    
    log.dirs=/opt/module/kafka/logs
    #topic在当前broker上的分区个数
    num.partitions=1
    #用来恢复和清理data下数据的线程数量
    num.recovery.threads.per.data.dir=1
    #segment文件保留的最长时间,超时将被删除
    log.retention.hours=168
    #配置连接Zookeeper集群地址
    zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181
    
    
    分别在hadoop102和hadoop103上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=2、broker.id=3
    
    启动集群kafka-server-start.sh -daemon server.properties  所有节点都要
    
    关闭集群kafka-server-stop.sh stop
    
    查看当前服务器中的所有topic kafka-topics.sh --zookeeper cc1:2181 --list
    创建topic    kafka-topics.sh --zookeeper cc1:2181 
    --create --replication-factor 3 --partitions 1 --topic first
    删除topic  kafka-topics.sh --zookeeper hadoop101:2181 
    --delete --topic first
    消费消息 kafka-console-consumer.sh 
    --bootstrap-server cc1:9092 --from-beginning --topic first

    kafka存储策略

    无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:
    1)基于时间:log.retention.hours=168
    2)基于大小:log.retention.bytes=1073741824

    kafka follower如何与leader同步数据 写流程

    1)producer先从zookeeper的 "/brokers/.../state"节点找到该partition的leader
    2)producer将消息发送给该leader
    3)leader将消息写入本地log
    4)followers从leader pull消息,写入本地log后向leader发送ACK
    5)leader收到所有ISR中的replication的ACK后向producer发送ACK
    注:leader会维护一个与其一定程度保持同步的Replica列表,该列表称为ISR(in-sync Replica)。所说的“一定程度”是指可以忍受的滞后范围,这个范围可以通过参数replica.lag.time.max.ms(默认10秒)进行配置。与leader副本同步滞后过多的副本(不包括leader)副本,组成OSR(Out-Sync Relipcas)。在正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步。

    flume和kafka集成

    1)配置flume(flume-kafka.conf)
    # define
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F  /opt/module/datas/flume.log
    a1.sources.r1.shell = /bin/bash -c
    
    # sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
    a1.sinks.k1.kafka.topic = second
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = -1
    
    
    # channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # bind
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    2) 启动kafkaIDEA消费者
    3) 进入flume根目录下,启动flume
    $ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf
    4) 向 /opt/module/datas/flume.log里追加数据,查看kafka消费者消费情况
    $ echo hello > /opt/module/datas/flume.log

    kafka中的 zookeeper 起到什么作用,可以不用zookeeper么

    zookeeper 是一个分布式的协调组件,早期版本的kafka用zk做meta信息存储,consumer的消费状态,group的管理以及 offset的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖,
    
    但是broker依然依赖于ZK,zookeeper 在kafka中还用来选举controller 和 检测broker是否存活等等。
    我凝视这恒星,等待这那场风暴,我已经准备好了
  • 相关阅读:
    一个应用程序无法启动错误的解决过程
    C#调用C库的注意事项
    STM32硬件调试详解
    CP2102模块介绍(USB转uart)
    CH340在STM32实现一键下载电路
    LM27313升压转换器
    常用贴片电阻、电容、电感封装
    MAX16054
    在51系列中data,idata,xdata,pdata的区别
    用UGN3503霍尔器件制作的数字指南针_电路图
  • 原文地址:https://www.cnblogs.com/cheng5350/p/11877758.html
Copyright © 2011-2022 走看看