zoukankan      html  css  js  c++  java
  • kafka 通信报文格式

    1. 序列化一条消息

    消息有 key 和 value

    kafka 提供了基础数据类型的序列化工具,对于业务的自定义类需要自行实现序列化

    ProducerRecord 是对象,含 KV 和 headers,此时的 KV 还是对象

    在 KafkaProducer#doSend 中会对 KV 进行序列化,得到 KV 的 byte 数组

    然后把 byte 数组和 headers 加入到 ProducerBatch 中

    代码见:

    org.apache.kafka.clients.producer.internals.ProducerBatch#recordsBuilder
    org.apache.kafka.common.record.MemoryRecordsBuilder#appendStream

    2. kafka 的 tcp 报文

    利用 Struct 和 Schema 把 ProducerBatch 的数据转换成符合 kafka 格式的 tcp 报文

    以发送消息为例

    org.apache.kafka.common.requests.AbstractRequest#toSend
    org.apache.kafka.common.requests.AbstractRequest#serialize
    org.apache.kafka.common.requests.AbstractRequestResponse#serialize
    org.apache.kafka.common.requests.ProduceRequest#toStruct
    org.apache.kafka.common.protocol.types.Schema#write

    org.apache.kafka.common.requests.RequestHeader#toStruct

    public Struct toStruct() {
        Schema schema = schema(apiKey.id, apiVersion);
        Struct struct = new Struct(schema);
        struct.set(API_KEY_FIELD_NAME, apiKey.id);
        struct.set(API_VERSION_FIELD_NAME, apiVersion);
    
        // only v0 of the controlled shutdown request is missing the clientId
        if (struct.hasField(CLIENT_ID_FIELD_NAME))
            struct.set(CLIENT_ID_FIELD_NAME, clientId);
        struct.set(CORRELATION_ID_FIELD_NAME, correlationId);
        return struct;
    }

    org.apache.kafka.common.requests.ProduceRequest#toStruct

    public Struct toStruct() {
        // Store it in a local variable to protect against concurrent updates
        Map<TopicPartition, MemoryRecords> partitionRecords = partitionRecordsOrFail();
        short version = version();
        Struct struct = new Struct(ApiKeys.PRODUCE.requestSchema(version));
        Map<String, Map<Integer, MemoryRecords>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
        struct.set(ACKS_KEY_NAME, acks);
        struct.set(TIMEOUT_KEY_NAME, timeout);
        struct.setIfExists(NULLABLE_TRANSACTIONAL_ID, transactionalId);
    
        List<Struct> topicDatas = new ArrayList<>(recordsByTopic.size());
        for (Map.Entry<String, Map<Integer, MemoryRecords>> topicEntry : recordsByTopic.entrySet()) {
            Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME);
            topicData.set(TOPIC_NAME, topicEntry.getKey());
            List<Struct> partitionArray = new ArrayList<>();
            for (Map.Entry<Integer, MemoryRecords> partitionEntry : topicEntry.getValue().entrySet()) {
                MemoryRecords records = partitionEntry.getValue();
                Struct part = topicData.instance(PARTITION_DATA_KEY_NAME)
                        .set(PARTITION_ID, partitionEntry.getKey())
                        .set(RECORD_SET_KEY_NAME, records);
                partitionArray.add(part);
            }
            topicData.set(PARTITION_DATA_KEY_NAME, partitionArray.toArray());
            topicDatas.add(topicData);
        }
        struct.set(TOPIC_DATA_KEY_NAME, topicDatas.toArray());
        return struct;
    }

    组装报文

    public abstract class AbstractRequestResponse {
        /**
         * Visible for testing.
         */
        public static ByteBuffer serialize(Struct headerStruct, Struct bodyStruct) {
            ByteBuffer buffer = ByteBuffer.allocate(headerStruct.sizeOf() + bodyStruct.sizeOf());
            headerStruct.writeTo(buffer);
            bodyStruct.writeTo(buffer);
            buffer.rewind();
            return buffer;
        }
    }
    
    public class NetworkSend extends ByteBufferSend {
    
        public NetworkSend(String destination, ByteBuffer buffer) {
            super(destination, sizeDelimit(buffer));
        }
    
        private static ByteBuffer[] sizeDelimit(ByteBuffer buffer) {
            return new ByteBuffer[] {sizeBuffer(buffer.remaining()), buffer};
        }
    
        private static ByteBuffer sizeBuffer(int size) {
            ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
            sizeBuffer.putInt(size);
            sizeBuffer.rewind();
            return sizeBuffer;
        }
    
    }

    所以能推断出,kafka 报文格式:4 字节存储长度,headerStruct,bodyStruct

    当然通过 NetworkSend 和 NetworkReceive 的注释也能看出来

  • 相关阅读:
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    微信小程序TodoList
    C语言88案例-找出数列中的最大值和最小值
    C语言88案例-使用指针的指针输出字符串
  • 原文地址:https://www.cnblogs.com/allenwas3/p/11626980.html
Copyright © 2011-2022 走看看