zoukankan      html  css  js  c++  java
  • 3.kafka数据存储和ack

    需要看图学习

    producer生产数据,通过ack发送到kafka 中broker(每台机器的节点不一样)对应的partition,

    • 存数据:partition存放在pagecache中,最终持久化到磁盘中

    • 取数据: consumer先到达kernel,kernel通知partition获取元数据,然后调起senfile(in,offset, out),sendfile先去pagecache拿数据,拿不到去磁盘并缓存到pagecache,发送给sendfile,使用了0拷贝模式(不把数据拷贝给应用kafka)

    数据存储方式

    基础:
        数组 大小固定 空间上是连续的 计算方式找到方便
        链表 大小不固定 空间上不连续 遍历复杂度高 需要建立索引
    
    数据存储方式是链表 需要维护自己的索引,索引有两种方式:1.offset 2.timestamp 其实timestamp可以转换成offset

    producer生产数据到kafka的partition ack有三种方式

    ack=0: 不管kafka的partition状态,只往里面发数据,因为不获取kafka分区的回调信息
    ack=1: 往kafka发数据,只要有leader存活(broker抢到controller),就往kafka发数据,因为需要partition返回确认信息
    ack=-1: 往kafaka发数据,当发数据的时候出现网络波动、副本或者主机死掉,那么会出现短暂的卡顿,之后会正常发数据,因为ack=-1需要所有的ISR返回ok信息,如果没有返回的会把该副本T出ISR

    一些语义

    ISR: in-sync replicas 存活的副本
    
    OSR: outof-sync replicas 超过阈值时间10秒,没有心跳的副本(死掉的副本)
    
    AR: assigned replicas 面向分区的副本集合  AR = ISR + OSR
    
    LW:、HW、LEO看图理解

    创建topic查看ISR

    [root@ke03 ~]# kafka-topics.sh --zookeeper ke02:2181,ke03:2181/kafka --create --topic xiaoke-items --partitions 2 --replication-factor 3
    Created topic xiaoke-items.
    [root@ke03 ~]# kafka-topics.sh --zookeeper ke02:2181,ke03:2181/kafka --describe --topic xiaoke-items
    Topic:xiaoke-items    PartitionCount:2    ReplicationFactor:3    Configs:
        Topic: xiaoke-items    Partition: 0    Leader: 2    Replicas: 2,3,1    Isr: 2,3,1
        Topic: xiaoke-items    Partition: 1    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2
    
    partition0: 在2节点 副本在1,2,3节点 共三个 ISR存活的副本1,2,3

    追踪进程,发现日志是通过网络IO发送的

    [root@ke03 xiaoke-items-0]# jps
    11957 Kafka
    [root@ke03 xiaoke-items-0]# lsof -Pnp 11957
    COMMAND   PID USER   FD   TYPE             DEVICE SIZE/OFF    NODE NAME
    java    11957 root  cwd    DIR                8,3     4096  924176 /opt/bigdata/kafka/config
    java    11957 root  143u   REG                8,3        0  262735 /var/kafka_data/xiaoke-items-1/00000000000000000000.log
    java    11957 root  144u   REG                8,3        0  262740 /var/kafka_data/xiaoke-items-0/00000000000000000000.log
    
    
    问:为什么log不用mmap, 而用普通IO呢?
    log使用普通io的形式目的是通用性
    数据存入磁盘的可靠性级别
    app层级
    调用了io的write,但是这个时候只是到达了内核,性能快,但是丢数据
    只有NIO的filechannel,你调用了write()+force(),才真的写到磁盘,性能极低的
    1.每条都force
    2.只是write基于内核刷写机制,靠脏页
    
    java中:
    传统的io, io.flush是个空实现,没有物理刷盘,还是依赖内核的dirty刷盘,所以,会丢东西

    向topic:xiaoke-items 生产数据

    key: item0 val: val0 partition: 1 offset: 0
    key: item1 val: val0 partition: 0 offset: 0
    key: item2 val: val0 partition: 1 offset: 1
    key: item0 val: val1 partition: 1 offset: 2
    key: item1 val: val1 partition: 0 offset: 1
    key: item2 val: val1 partition: 1 offset: 3
    key: item0 val: val2 partition: 1 offset: 4
    key: item1 val: val2 partition: 0 offset: 2
    key: item2 val: val2 partition: 1 offset: 5
    key: item0 val: val0 partition: 1 offset: 6
    key: item1 val: val0 partition: 0 offset: 3
    key: item2 val: val0 partition: 1 offset: 7
    key: item0 val: val1 partition: 1 offset: 8
    key: item1 val: val1 partition: 0 offset: 4
    key: item2 val: val1 partition: 1 offset: 9
    
    查看日志:
    [root@ke03 xiaoke-items-0]# ll -h
    total 8.0K
    -rw-r--r-- 1 root root 10M Jul 26 10:30 00000000000000000000.index
    -rw-r--r-- 1 root root 385 Jul 26 11:25 00000000000000000000.log
    -rw-r--r-- 1 root root 10M Jul 26 10:30 00000000000000000000.timeindex
    -rw-r--r-- 1 root root   8 Jul 26 11:25 leader-epoch-checkpoint
    
    查看kafka日志文件
    [root@ke03 xiaoke-items-0]# kafka-dump-log.sh --files 00000000000000000000.log  | more
    Dumping 00000000000000000000.log
    Starting offset: 0
    baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1627396648184 size: 77 magic: 2 compresscodec: NO
    NE crc: 1546433855 isvalid: true
    baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 77 CreateTime: 1627396651246 size: 77 magic: 2 compresscodec: N
    ONE crc: 2422575540 isvalid: true
    baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 154 CreateTime: 1627396654287 size: 77 magic: 2 compresscodec: 
    NONE crc: 674617845 isvalid: true
    baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 231 CreateTime: 1627396657309 size: 77 magic: 2 compresscodec: 
    NONE crc: 1996918817 isvalid: true
    baseOffset: 4 lastOffset: 4 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 308 CreateTime: 1627396660339 size: 77 magic: 2 compresscodec: 
    NONE crc: 110021385 isvalid: true
    
    总结:可以看出向0号分区发送了0-4号数据, 4号分区的日志文件offset是4
    
    
    查看index索引文件 
    [root@ke03 xiaoke-items-0]# kafka-dump-log.sh --files 00000000000000000000.index 
    Dumping 00000000000000000000.index
    offset: 0 position: 0
    为了看到效果:增加数据0号分区offset到122
    
    [root@ke03 xiaoke-items-0]# kafka-dump-log.sh --files 00000000000000000000.index 
    Dumping 00000000000000000000.index
    offset: 54 position: 4158
    offset: 108 position: 8316
    
    说明:
    1.position(字节数组):4158字节的位置 就是offset:54
    2.目前offset是122 日志记录到108 说明:offset的索引记录是跳跃记录,优点:减少了IO次数
    
    
    查看timeindex索引文件
    [root@ke03 xiaoke-items-0]# kafka-dump-log.sh --files 00000000000000000000.timeindex 
    Dumping 00000000000000000000.timeindex
    timestamp: 1627397016578 offset: 54
    timestamp: 1627397033738 offset: 108
    说明: timeindex索引文件指向index索引文件的offset

    取数据:
    1.
    timeindex(offset)文件 找到offset,通过offset找到position和下一个position之间的范围,然后在这个范围内进行检索
    
    

    测试ACK

    代码修改: p.setProperty(ProducerConfig.ACKS_CONFIG, "0");
    
    ack=0
    1.生产数据
    2.kill kafka
    3.ISR减少一个,正常往kafka正产数据
    
    
    ack=1
    1.生产数据
    2.kill kafka
    3.ISR减少一个,正常往kafka正产数据,因为leader存活,既(broker抢到了controller的这台机器)
    
    
    ack=-1
    1.生产数据
    2.kill kafka
    3.ISR减少一个,卡顿10秒之后正常往kafka正产数据

    发送和消费数据

    发送数据:producer.seed()
    
    消费数据:consumer.poll() 拉取数据
    
    修改消费的偏移量:consumer.seek(partition,offset);
    
    offset可以通过timestamp去转换
  • 相关阅读:
    codeforce666A_dp
    杭电1789_贪心
    杭电2059_记忆化搜索
    杭电1503_输出最长公共子序列
    杭电1501_dfs和记忆化搜索
    杭电1081_二维dp
    杭电1078_dfs
    coderforce 675C(贪心)
    杭电2571_01背包
    杭电1069_01背包
  • 原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/15068353.html
Copyright © 2011-2022 走看看