zoukankan      html  css  js  c++  java
  • 3.kafka数据存储和ack

    需要看图学习

    producer生产数据,通过ack发送到kafka 中broker(每台机器的节点不一样)对应的partition,

    • 存数据:partition存放在pagecache中,最终持久化到磁盘中

    • 取数据: consumer先到达kernel,kernel通知partition获取元数据,然后调起senfile(in,offset, out),sendfile先去pagecache拿数据,拿不到去磁盘并缓存到pagecache,发送给sendfile,使用了0拷贝模式(不把数据拷贝给应用kafka)

    数据存储方式

    基础:
        数组 大小固定 空间上是连续的 计算方式找到方便
        链表 大小不固定 空间上不连续 遍历复杂度高 需要建立索引
    
    数据存储方式是链表 需要维护自己的索引,索引有两种方式:1.offset 2.timestamp 其实timestamp可以转换成offset

    producer生产数据到kafka的partition ack有三种方式

    ack=0: 不管kafka的partition状态,只往里面发数据,因为不获取kafka分区的回调信息
    ack=1: 往kafka发数据,只要有leader存活(broker抢到controller),就往kafka发数据,因为需要partition返回确认信息
    ack=-1: 往kafaka发数据,当发数据的时候出现网络波动、副本或者主机死掉,那么会出现短暂的卡顿,之后会正常发数据,因为ack=-1需要所有的ISR返回ok信息,如果没有返回的会把该副本T出ISR

    一些语义

    ISR: in-sync replicas 存活的副本
    
    OSR: outof-sync replicas 超过阈值时间10秒,没有心跳的副本(死掉的副本)
    
    AR: assigned replicas 面向分区的副本集合  AR = ISR + OSR
    
    LW:、HW、LEO看图理解

    创建topic查看ISR

    [root@ke03 ~]# kafka-topics.sh --zookeeper ke02:2181,ke03:2181/kafka --create --topic xiaoke-items --partitions 2 --replication-factor 3
    Created topic xiaoke-items.
    [root@ke03 ~]# kafka-topics.sh --zookeeper ke02:2181,ke03:2181/kafka --describe --topic xiaoke-items
    Topic:xiaoke-items    PartitionCount:2    ReplicationFactor:3    Configs:
        Topic: xiaoke-items    Partition: 0    Leader: 2    Replicas: 2,3,1    Isr: 2,3,1
        Topic: xiaoke-items    Partition: 1    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2
    
    partition0: 在2节点 副本在1,2,3节点 共三个 ISR存活的副本1,2,3

    追踪进程,发现日志是通过网络IO发送的

    [root@ke03 xiaoke-items-0]# jps
    11957 Kafka
    [root@ke03 xiaoke-items-0]# lsof -Pnp 11957
    COMMAND   PID USER   FD   TYPE             DEVICE SIZE/OFF    NODE NAME
    java    11957 root  cwd    DIR                8,3     4096  924176 /opt/bigdata/kafka/config
    java    11957 root  143u   REG                8,3        0  262735 /var/kafka_data/xiaoke-items-1/00000000000000000000.log
    java    11957 root  144u   REG                8,3        0  262740 /var/kafka_data/xiaoke-items-0/00000000000000000000.log
    
    
    问:为什么log不用mmap, 而用普通IO呢?
    log使用普通io的形式目的是通用性
    数据存入磁盘的可靠性级别
    app层级
    调用了io的write,但是这个时候只是到达了内核,性能快,但是丢数据
    只有NIO的filechannel,你调用了write()+force(),才真的写到磁盘,性能极低的
    1.每条都force
    2.只是write基于内核刷写机制,靠脏页
    
    java中:
    传统的io, io.flush是个空实现,没有物理刷盘,还是依赖内核的dirty刷盘,所以,会丢东西

    向topic:xiaoke-items 生产数据

    key: item0 val: val0 partition: 1 offset: 0
    key: item1 val: val0 partition: 0 offset: 0
    key: item2 val: val0 partition: 1 offset: 1
    key: item0 val: val1 partition: 1 offset: 2
    key: item1 val: val1 partition: 0 offset: 1
    key: item2 val: val1 partition: 1 offset: 3
    key: item0 val: val2 partition: 1 offset: 4
    key: item1 val: val2 partition: 0 offset: 2
    key: item2 val: val2 partition: 1 offset: 5
    key: item0 val: val0 partition: 1 offset: 6
    key: item1 val: val0 partition: 0 offset: 3
    key: item2 val: val0 partition: 1 offset: 7
    key: item0 val: val1 partition: 1 offset: 8
    key: item1 val: val1 partition: 0 offset: 4
    key: item2 val: val1 partition: 1 offset: 9
    
    查看日志:
    [root@ke03 xiaoke-items-0]# ll -h
    total 8.0K
    -rw-r--r-- 1 root root 10M Jul 26 10:30 00000000000000000000.index
    -rw-r--r-- 1 root root 385 Jul 26 11:25 00000000000000000000.log
    -rw-r--r-- 1 root root 10M Jul 26 10:30 00000000000000000000.timeindex
    -rw-r--r-- 1 root root   8 Jul 26 11:25 leader-epoch-checkpoint
    
    查看kafka日志文件
    [root@ke03 xiaoke-items-0]# kafka-dump-log.sh --files 00000000000000000000.log  | more
    Dumping 00000000000000000000.log
    Starting offset: 0
    baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1627396648184 size: 77 magic: 2 compresscodec: NO
    NE crc: 1546433855 isvalid: true
    baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 77 CreateTime: 1627396651246 size: 77 magic: 2 compresscodec: N
    ONE crc: 2422575540 isvalid: true
    baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 154 CreateTime: 1627396654287 size: 77 magic: 2 compresscodec: 
    NONE crc: 674617845 isvalid: true
    baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 231 CreateTime: 1627396657309 size: 77 magic: 2 compresscodec: 
    NONE crc: 1996918817 isvalid: true
    baseOffset: 4 lastOffset: 4 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 308 CreateTime: 1627396660339 size: 77 magic: 2 compresscodec: 
    NONE crc: 110021385 isvalid: true
    
    总结:可以看出向0号分区发送了0-4号数据, 4号分区的日志文件offset是4
    
    
    查看index索引文件 
    [root@ke03 xiaoke-items-0]# kafka-dump-log.sh --files 00000000000000000000.index 
    Dumping 00000000000000000000.index
    offset: 0 position: 0
    为了看到效果:增加数据0号分区offset到122
    
    [root@ke03 xiaoke-items-0]# kafka-dump-log.sh --files 00000000000000000000.index 
    Dumping 00000000000000000000.index
    offset: 54 position: 4158
    offset: 108 position: 8316
    
    说明:
    1.position(字节数组):4158字节的位置 就是offset:54
    2.目前offset是122 日志记录到108 说明:offset的索引记录是跳跃记录,优点:减少了IO次数
    
    
    查看timeindex索引文件
    [root@ke03 xiaoke-items-0]# kafka-dump-log.sh --files 00000000000000000000.timeindex 
    Dumping 00000000000000000000.timeindex
    timestamp: 1627397016578 offset: 54
    timestamp: 1627397033738 offset: 108
    说明: timeindex索引文件指向index索引文件的offset

    取数据:
    1.
    timeindex(offset)文件 找到offset,通过offset找到position和下一个position之间的范围,然后在这个范围内进行检索
    
    

    测试ACK

    代码修改: p.setProperty(ProducerConfig.ACKS_CONFIG, "0");
    
    ack=0
    1.生产数据
    2.kill kafka
    3.ISR减少一个,正常往kafka正产数据
    
    
    ack=1
    1.生产数据
    2.kill kafka
    3.ISR减少一个,正常往kafka正产数据,因为leader存活,既(broker抢到了controller的这台机器)
    
    
    ack=-1
    1.生产数据
    2.kill kafka
    3.ISR减少一个,卡顿10秒之后正常往kafka正产数据

    发送和消费数据

    发送数据:producer.seed()
    
    消费数据:consumer.poll() 拉取数据
    
    修改消费的偏移量:consumer.seek(partition,offset);
    
    offset可以通过timestamp去转换
  • 相关阅读:
    如何将网格式报表打印成其它样式
    拥有与实力不相称的脾气是种灾难——北漂18年(23)
    8.8.1 Optimizing Queries with EXPLAIN
    mysql 没有rowid 怎么实现根据rowid回表呢?
    secondary index
    8.5.5 Bulk Data Loading for InnoDB Tables 批量数据加载
    mysql 中key 指的是索引
    8.5.4 Optimizing InnoDB Redo Logging 优化InnoDB Redo 日志
    8.5.3 Optimizing InnoDB Read-Only Transactions 优化InnoDB 只读事务
    8.5.1 Optimizing Storage Layout for InnoDB Tables InnoDB表的存储布局优化
  • 原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/15068353.html
Copyright © 2011-2022 走看看