zoukankan      html  css  js  c++  java
  • Kafka学习笔记之Kakfa异常分析-Magic v0 does not support record headers

    0x00 概述

    最近测试跟我说,某个应用消费不到交易的消息。登录到Kafka Broker看下了下日志,发现一直在报错:

    java.lang.IllegalArgumentException: Magic v0 does not support record headers
        at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
        at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
        at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
        at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
        at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
        at scala.Option.map(Option.scala:146)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
        at scala.Option.flatMap(Option.scala:171)
        at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
        at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
        at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2012)
        at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
        at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
        at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
        at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
        at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
        at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
        at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639)
        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
        at java.lang.Thread.run(Thread.java:748)
    问了下相关开发人员,发现最近有个版本需要在Kafka信息的Headers中增加LogId来做交易跟踪,结合错误信息中提示消费者Api版本太低,不支持header信息,导致出错,让开发人员去掉header后,消费者可以正常消费消息

    0x01 模拟重现

    1.1 Kafka版本:0.11.0

    生产者代码:

    ​写了个拦截器,为每条消息的header中添加LOG_ID

    public  class KafkaProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
    
        @Override
        public void configure(Map<String, ?> configs) {}
    
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}
    
        @Override
        public void close() {}
    
        @Override
        public ProducerRecord onSend(ProducerRecord<K, V> record) {
            String uuid = UUID.randomUUID().toString();
            record.headers().add("LOG_ID",uuid.getBytes());
            return record;
        }
    
    }
    
    public class App 
    {
        public static void main( String[] args ) throws InterruptedException
        {
            Properties props = new Properties();
            // broker地址
            props.put("bootstrap.servers", "localhost:9092");
    
            // 请求时候需要验证
            props.put("acks", "all");
    
            // 请求失败时候需要重试
            props.put("retries", 1);
    
            // 内存缓存区大小
            props.put("buffer.memory", 33554432);
    
            // 指定消息key序列化方式
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            // 指定消息本身的序列化方式
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            
            List<String> interceptors = new ArrayList<>();
            interceptors.add("com.df.KafkaTest.KafkaProducerInterceptor"); 
            props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
            Producer<String, String> producer = new KafkaProducer<>(props);
            
            for (int i = 10; i < 20; i++)
                try {
                    
                    AccessInfo ai = new AccessInfo();
                    ai.setAccessId("123456");
                    ai.setAccessName("源码婆媳"+Integer.toString(i));
                    ai.setBusScope("01");
                    ai.setIconUrl("http://www.baidu.com");
                    ProducerRecord record = new ProducerRecord<>("testTopic",0,"H"+Integer.toString(i),JSON.toJSONString(ai));
                    producer.send(record).get();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
    
            System.out.println("Message sent successfully");
            producer.close();
        }
    }

    消费者代码

    使用高版本Api的客户端:

    public class KafkaConsumerAsync {
     
        public static void main(String[] args) throws InterruptedException {
     
            // 1、准备配置文件
            String kafkas = "127.0.0.1:9092";
            Properties props = new Properties();
            //kafka连接信息
            props.put("bootstrap.servers",kafkas);
            //消费者组id
            props.put("group.id", "testTopic-002");
            //是否自动提交offset
            props.put("enable.auto.commit", "false");
            //在没有offset的情况下采取的拉取策略
            props.put("auto.offset.reset", "earliest");
            //自动提交时间间隔
            props.put("auto.commit.interval.ms", "1000");
            //设置一次fetch请求取得的数据最大为1k
            props.put("fetch.max.bytes", "1024");
            //key反序列化
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            //value反序列化
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     
            String topic = "testTopic";
            // 2、创建KafkaConsumer
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            // 3、订阅数据,不给定监听器
            consumer.subscribe(Collections.singleton(topic));
     
            try{
                //最少处理100条
                int minCommitSize = 100;
                //定义计数器
                int icount = 0;
                // 4、获取数据
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.printf("topic = %s,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),record.offset(), record.key(), record.value());
                        icount++;
                    }
                
                    //在业务逻辑处理成功后提交offset
                    if(icount >= minCommitSize){
                        //满足最少消费100条,再进行异步提交
                        consumer.commitAsync(new OffsetCommitCallback() {
                            @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                                if(exception == null){
                                    System.out.println("commit success");
                                }else {
                                    //提交失败,对应处理
                                    System.out.println("commit failed");
                                }
                            }
                        });
                        
                        //计数器归零
                        icount = 0 ;
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                //关闭连接
                consumer.close();
            }
        }
    }

    执行结果:

    [28 10:20:08,923 INFO ] [main] internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] Discovered group coordinator xxxxx.com:9092 (id: 2147483647 rack: null)
    [28 10:20:08,931 INFO ] [main] internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] Revoking previously assigned partitions []
    [28 10:20:08,931 INFO ] [main] internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] (Re-)joining group
    [28 10:20:08,958 INFO ] [main] internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] Successfully joined group with generation 1
    [28 10:20:08,960 INFO ] [main] internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] Setting newly assigned partitions [testTopic-0]
    [28 10:20:08,978 INFO ] [main] internals.Fetcher - [Consumer clientId=consumer-1, groupId=testTopic-002] Resetting offset for partition testTopic-0 to offset 0.
    topic = testTopic,partition = 0,offset = 0, key = H10, value = {"accessId":"123456","accessName":"源码婆媳10","busScope":"01","iconUrl":"http://www.baidu.com"}
    topic = testTopic,partition = 0,offset = 1, key = H11, value = {"accessId":"123456","accessName":"源码婆媳11","busScope":"01","iconUrl":"http://www.baidu.com"}
    topic = testTopic,partition = 0,offset = 2, key = H12, value = {"accessId":"123456","accessName":"源码婆媳12","busScope":"01","iconUrl":"http://www.baidu.com"}
    topic = testTopic,partition = 0,offset = 3, key = H13, value = {"accessId":"123456","accessName":"源码婆媳13","busScope":"01","iconUrl":"http://www.baidu.com"}
    topic = testTopic,partition = 0,offset = 4, key = H14, value = {"accessId":"123456","accessName":"源码婆媳14","busScope":"01","iconUrl":"http://www.baidu.com"}
    topic = testTopic,partition = 0,offset = 5, key = H15, value = {"accessId":"123456","accessName":"源码婆媳15","busScope":"01","iconUrl":"http://www.baidu.com"}
    topic = testTopic,partition = 0,offset = 6, key = H16, value = {"accessId":"123456","accessName":"源码婆媳16","busScope":"01","iconUrl":"http://www.baidu.com"}
    topic = testTopic,partition = 0,offset = 7, key = H17, value = {"accessId":"123456","accessName":"源码婆媳17","busScope":"01","iconUrl":"http://www.baidu.com"}
    topic = testTopic,partition = 0,offset = 8, key = H18, value = {"accessId":"123456","accessName":"源码婆媳18","busScope":"01","iconUrl":"http://www.baidu.com"}
    topic = testTopic,partition = 0,offset = 9, key = H19, value = {"accessId":"123456","accessName":"源码婆媳19","busScope":"01","iconUrl":"http://www.baidu.com"}

    使用低版本(0.8.22)客户端代码

    public class SimpleConsumerExample {
     
        private static kafka.javaapi.consumer.ConsumerConnector consumer;
     
        public static void consume() {
     
            Properties props = new Properties();
            // zookeeper 配置
            props.put("zookeeper.connect", "127.0.0.1:2181");
     
            // group 代表一个消费组
            props.put("group.id", "jd-group");
     
            // zk连接超时
            props.put("zookeeper.session.timeout.ms", "4000");
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");
            props.put("auto.offset.reset", "smallest");
            // 序列化类
            props.put("serializer.class", "kafka.serializer.StringEncoder");
     
            ConsumerConfig config = new ConsumerConfig(props);
     
            consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
     
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put("testTopic", new Integer(1));
     
            StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
            StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
     
            Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,
                    keyDecoder, valueDecoder);
            KafkaStream<String, String> stream = consumerMap.get("testTopic").get(0);
            ConsumerIterator<String, String> it = stream.iterator();
            while (it.hasNext())
                System.out.println(it.next().message());
        }
     
        public static void main(String[] args) {
            consume();
        }
    }

    消费者运行后,一直消费不到消息

    [28 09:51:41,590 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Connected to xxxxxx.xx.com:9092 for producing
    [28 09:51:41,591 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Disconnecting from xxxxxx.xx.com:9092
    [28 09:51:41,592 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1566957087042] Added fetcher for partitions ArrayBuffer([[testTopic,0], initOffset 0 to broker id:0,host:xxxxxx.xx.com,port:9092] )
    [28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Verifying properties
    [28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property client.id is overridden to jd-group
    [28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property metadata.broker.list is overridden to xxxxxx.xx.com:9092
    [28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property request.timeout.ms is overridden to 30000
    [28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] client.ClientUtils$ - Fetching metadata from broker id:0,host:xxxxxx.xx.com,port:9092 with correlation id 65 for 1 topic(s) Set(testTopic)
    [28 09:51:41,799 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Connected to xxxxxx.xx.com:9092 for producing
    [28 09:51:41,824 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Disconnecting from xxxxxx.xx.com:9092
    [28 09:51:41,825 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1566957087042] Added fetcher for partitions ArrayBuffer([[testTopic,0], initOffset 0 to broker id:0,host:xxxxxx.xx.com,port:9092] )
    [28 09:51:42,032 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Verifying properties
    [28 09:51:42,032 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property client.id is overridden to jd-group
    [28 09:51:42,032 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property metadata.broker.list is overridden to xxxxxx.xx.com:9092
    [28 09:51:42,032 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property request.timeout.ms is overridden to 30000
    [28 09:51:42,033 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] client.ClientUtils$ - Fetching metadata from broker id:0,host:xxxxxx.xx.com,port:9092 with correlation id 66 for 1 topic(s) Set(testTopic)
    [28 09:51:42,035 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Connected to xxxxxx.xx.com:9092 for producing
    [28 09:51:42,041 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Disconnecting from xxxxxx.xx.com:9092
    [28 09:51:42,041 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1566957087042] Added fetcher for partitions ArrayBuffer([[testTopic,0], initOffset 0 to broker id:0,host:xxxxxx.xx.com,port:9092] )
    [28 09:51:42,251 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Verifying properties
    [28 09:51:42,252 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property client.id is overridden to jd-group
    [28 09:51:42,252 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property metadata.broker.list is overridden to xxxxxx.xx.com:9092
    [28 09:51:42,252 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property request.timeout.ms is overridden to 30000
    [28 09:51:42,253 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] client.ClientUtils$ - Fetching metadata from broker id:0,host:xxxxxx.xx.com,port:9092 with correlation id 67 for 1 topic(s) Set(testTopic)
    [28 09:51:42,254 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Connected to xxxxxx.xx.com:9092 for producing

    Kafka Broker的server.log一直在刷错误日志:

    [2019-08-28 09:51:42,045] ERROR [KafkaApi-0] Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=testTopic,partitions=[{partition=0,fetch_offset=0,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
    java.lang.IllegalArgumentException: Magic v0 does not support record headers
        at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
        at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
        at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
        at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
        at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
        at scala.Option.map(Option.scala:146)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
        at scala.Option.flatMap(Option.scala:171)
        at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
        at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
        at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2012)
        at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
        at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
        at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
        at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
        at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
        at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
        at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639)
        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
        at java.lang.Thread.run(Thread.java:748)
     
    通过错误日志信息结合源码,我们发现,在Broker拉取到Kakfa消息后,调用fetchResponseCallback回调方法,创建返回信息时,会校验消费者Api版本,如果低于当前Broker版本与向下转换消息
          def fetchResponseCallback(bandwidthThrottleTimeMs: Int) {
            def createResponse(requestThrottleTimeMs: Int): RequestChannel.Response = {
              val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
              fetchedPartitionData.asScala.foreach { case (tp, partitionData) =>
                convertedData.put(tp, convertedPartitionData(tp, partitionData))
              }
              val response = new FetchResponse(convertedData, 0)
              val responseStruct = response.toStruct(versionId)
    
              trace(s"Sending fetch response to client $clientId of ${responseStruct.sizeOf} bytes.")
              response.responseData.asScala.foreach { case (topicPartition, data) =>
                // record the bytes out metrics only when the response is being sent
                brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
              }
    
              val responseSend = response.toSend(responseStruct, bandwidthThrottleTimeMs + requestThrottleTimeMs,
                request.connectionId, request.header)
              RequestChannel.Response(request, responseSend)
            }
    
            if (fetchRequest.isFromFollower)
              sendResponseExemptThrottle(createResponse(0))
            else
              sendResponseMaybeThrottle(request, request.header.clientId, requestThrottleMs =>
                requestChannel.sendResponse(createResponse(requestThrottleMs)))
          }
    
    
    
        def convertedPartitionData(tp: TopicPartition, data: FetchResponse.PartitionData) = {
    
    
          replicaManager.getMagic(tp).flatMap { magic =>
            val downConvertMagic = {
              if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
                Some(RecordBatch.MAGIC_VALUE_V0)
              else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1))
                Some(RecordBatch.MAGIC_VALUE_V1)
              else
                None
            }
    
            downConvertMagic.map { magic =>
              trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId")
              //在这里会对消息进行向下转换
              val converted = data.records.downConvert(magic, fetchRequest.fetchData.get(tp).fetchOffset)
              new FetchResponse.PartitionData(data.error, data.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET,
                data.logStartOffset, data.abortedTransactions, converted)
            }
    
          }.getOrElse(data)
        }
     
    对消息转换,最后会调用MemoryRecordsBuilder的appendWithOffset,在此方法中做一些调用,如果调用不通过就会抛出异常,Magic v0 does not support record headers就是在此方法中抛出的。因为Magic v0 和Magic v1版本的消息格式中,不支持header
    private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
                                      ByteBuffer value, Header[] headers) {
            try {
                if (isControlRecord != isControlBatch)
                    throw new IllegalArgumentException("Control records can only be appended to control batches");
    
                if (lastOffset != null && offset <= lastOffset)
                    throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s " +
                            "(Offsets must increase monotonically).", offset, lastOffset));
    
                if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP)
                    throw new IllegalArgumentException("Invalid negative timestamp " + timestamp);
    
                if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
                    throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
    
                if (baseTimestamp == null)
                    baseTimestamp = timestamp;
    
                if (magic > RecordBatch.MAGIC_VALUE_V1) {
                    appendDefaultRecord(offset, timestamp, key, value, headers);
                    return null;
                } else {
                    return appendLegacyRecord(offset, timestamp, key, value);
                }
            } catch (IOException e) {
                throw new KafkaException("I/O exception when writing to the append stream, closing", e);
            }
        }

    1.2 Broker的Kafka版本1.0

    运行低版本消费者:
    [28 14:26:23,068 INFO ] [jd-group_xx-1566973572731-a5b3105a-leader-finder-thread] consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1566973572960] Added fetcher for partitions ArrayBuffer([[testTopic,0], initOffset -1 to broker id:0,host:xx.xx.com,port:9092] )
    {"accessId":"123456","accessName":"源码婆媳10","busScope":"01","iconUrl":"http://www.baidu.com"}
    {"accessId":"123456","accessName":"源码婆媳11","busScope":"01","iconUrl":"http://www.baidu.com"}
    {"accessId":"123456","accessName":"源码婆媳12","busScope":"01","iconUrl":"http://www.baidu.com"}
    {"accessId":"123456","accessName":"源码婆媳13","busScope":"01","iconUrl":"http://www.baidu.com"}
    {"accessId":"123456","accessName":"源码婆媳14","busScope":"01","iconUrl":"http://www.baidu.com"}
    {"accessId":"123456","accessName":"源码婆媳15","busScope":"01","iconUrl":"http://www.baidu.com"}
    {"accessId":"123456","accessName":"源码婆媳16","busScope":"01","iconUrl":"http://www.baidu.com"}
    {"accessId":"123456","accessName":"源码婆媳17","busScope":"01","iconUrl":"http://www.baidu.com"}
    {"accessId":"123456","accessName":"源码婆媳18","busScope":"01","iconUrl":"http://www.baidu.com"}
    {"accessId":"123456","accessName":"源码婆媳19","busScope":"01","iconUrl":"http://www.baidu.com"}

    很奇怪,怎么升级下Broker版本就可以正常消费消息了呢?不是说好的v0版本消息格式不支持header吗?

    看了下1.0版本的源码,发现在做消息向下转换的时候调用的不是MemoryRecordsBuilder,而是RecordsUtil的convertRecordBatch,当发现v0或v1版本时,直接忽略header信息,这样消费者就能正常消息消息了

        private static MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, RecordBatchAndRecords recordBatchAndRecords) {
            RecordBatch batch = recordBatchAndRecords.batch;
            final TimestampType timestampType = batch.timestampType();
            long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
    
            MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(),
                    timestampType, recordBatchAndRecords.baseOffset, logAppendTime);
            for (Record record : recordBatchAndRecords.records) {
                // Down-convert this record. Ignore headers when down-converting to V0 and V1 since they are not supported
                if (magic > RecordBatch.MAGIC_VALUE_V1)
                    builder.append(record);
                else
                    builder.appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value());
            }
    
            builder.close();
            return builder;
        }

    0x02 总结

    ​ 通过之前的分析,解决上面的错误有三种方法

    1)升级消费端Api版本,升级到新版本后,支持header

    2)升级Broker版本,1.0以上版本,向下转换时,会忽略到header信息

    3)最后一种方式,也很简单,那就是生产者不增加header信息。因为我们项目中LOG-ID暂时不是必须的,我们选择了此种方式,等消费者端版本全部升级之后,再添加header信息

    0x03 转载

    https://www.jianshu.com/p/80ca3ade8fb2

    https://zhuanlan.zhihu.com/p/205676507?utm_source=wechat_session

     
  • 相关阅读:
    POJ 1113 Wall
    POJ 2159 Ancient Cipher
    POJ 3253 Fence Repair
    HDU 5444 Elven Postman
    HDU 5432 Pyramid Split
    数据库 组合查询
    数据库 简单的数据查询
    数据库 聚合函数与分组
    数据库 使用DML语句更改数据
    数据库的数据完整性
  • 原文地址:https://www.cnblogs.com/JetpropelledSnake/p/14268723.html
Copyright © 2011-2022 走看看