zoukankan      html  css  js  c++  java
  • Spring-framework应用程序启动loadtime源码分析笔记(三)——@KafkaListener

    org.springframework.context.annotation.ConfigurationClassParser.getConfigurationClasses()读所有@Configuration类,传入org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions()进行@Configuration对象定义加载,加载@Configuration类定义@Bean方法、@Import等资源,如使用@EnableKafka加载KafkaBootstrapConfiguration然后加载KafkaListenerAnnotationBeanPostProcessor如下列stack所示:
    Thread [main] (Suspended (breakpoint at line 188 in ConfigurationClass))    
        owns: Object  (id=120)    
        ConfigurationClass.addBeanMethod(BeanMethod) line: 188    
        ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClass, ConfigurationClassParser$SourceClass) line: 316    
        ConfigurationClassParser.processConfigurationClass(ConfigurationClass) line: 244    
        ConfigurationClassParser.processImports(ConfigurationClass, SourceClass, Collection<SourceClass>, boolean) line: 606    
        ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClass, ConfigurationClassParser$SourceClass) line: 299    
        ConfigurationClassParser.processConfigurationClass(ConfigurationClass) line: 244    
        ConfigurationClassParser.parse(String, String) line: 189    
        ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClass, ConfigurationClassParser$SourceClass) line: 292    
        ConfigurationClassParser.processConfigurationClass(ConfigurationClass) line: 244    
        ConfigurationClassParser.parse(AnnotationMetadata, String) line: 197    
        ConfigurationClassParser.parse(Set<BeanDefinitionHolder>) line: 165    
        ConfigurationClassPostProcessor.processConfigBeanDefinitions(BeanDefinitionRegistry) line: 308    
        ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(BeanDefinitionRegistry) line: 229    
        PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(Collection<BeanDefinitionRegistryPostProcessor>, BeanDefinitionRegistry) line: 271    
        PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(ConfigurableListableBeanFactory, List<BeanFactoryPostProcessor>) line: 94    
        AnnotationConfigApplicationContext(AbstractApplicationContext).invokeBeanFactoryPostProcessors(ConfigurableListableBeanFactory) line: 686    
        AnnotationConfigApplicationContext(AbstractApplicationContext).refresh() line: 524    
        SpringApplication.refresh(ApplicationContext) line: 750    
        SpringApplication.refreshContext(ConfigurableApplicationContext) line: 386    
        SpringApplication.run(String...) line: 327    
        SpringApplication.run(Class<?>[], String[]) line: 1245    
        SpringApplication.run(Class<?>, String...) line: 1233    
        Application.main(String[]) line: 51    
    KafkaListenerAnnotationBeanPostProcessor对象定义注册在DefaultListableBeanFactory.registerBeanDefinition(),使用名称为KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,如下列stack:
    Thread [main] (Suspended (breakpoint at line 779 in DefaultListableBeanFactory))    
        owns: Object  (id=130)    
        DefaultListableBeanFactory.registerBeanDefinition(String, BeanDefinition) line: 779    
        ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForBeanMethod(BeanMethod) line: 262    
        ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(ConfigurationClass, ConfigurationClassBeanDefinitionReader$TrackedConditionEvaluator) line: 141 
        ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(Set<ConfigurationClass>) line: 117    
        ConfigurationClassPostProcessor.processConfigBeanDefinitions(BeanDefinitionRegistry) line: 320    
        ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(BeanDefinitionRegistry) line: 229    
        PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(Collection<BeanDefinitionRegistryPostProcessor>, BeanDefinitionRegistry) line: 271    
        PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(ConfigurableListableBeanFactory, List<BeanFactoryPostProcessor>) line: 94    
        AnnotationConfigApplicationContext(AbstractApplicationContext).invokeBeanFactoryPostProcessors(ConfigurableListableBeanFactory) line: 686    
        AnnotationConfigApplicationContext(AbstractApplicationContext).refresh() line: 524    
        SpringApplication.refresh(ApplicationContext) line: 750    
        SpringApplication.refreshContext(ConfigurableApplicationContext) line: 386    
        SpringApplication.run(String...) line: 327    
        SpringApplication.run(Class<?>[], String[]) line: 1245    
        SpringApplication.run(Class<?>, String...) line: 1233    
        Application.main(String[]) line: 51    
    KafkaListenerAnnotationBeanPostProcessor对象定义注册后,在PostProcessorRegistrationDelegate.registerBeanPostProcessors()实例化,将KafkaListenerAnnotationBeanPostProcessor对象添加到DefaultListableBeanFactory.beanPostProcessors保存,用于后续注释了@KafkaListener的对象实例化后处理事件的调用。KafkaListenerAnnotationBeanPostProcessor在AnnotationConfigApplicationContext(AbstractApplicationContext).registerBeanPostProcessors(beanFactory)注册,如下图stack:
     
    Thread [main] (Suspended (breakpoint at line 848 in AbstractBeanFactory))    
        owns: Object  (id=361)    
        DefaultListableBeanFactory(AbstractBeanFactory).addBeanPostProcessor(BeanPostProcessor) line: 848    
        PostProcessorRegistrationDelegate.registerBeanPostProcessors(ConfigurableListableBeanFactory, List<BeanPostProcessor>) line: 293    
        PostProcessorRegistrationDelegate.registerBeanPostProcessors(ConfigurableListableBeanFactory, AbstractApplicationContext) line: 231    
        AnnotationConfigApplicationContext(AbstractApplicationContext).registerBeanPostProcessors(ConfigurableListableBeanFactory) line: 702    
        AnnotationConfigApplicationContext(AbstractApplicationContext).refresh() line: 527    
        SpringApplication.refresh(ApplicationContext) line: 750    
        SpringApplication.refreshContext(ConfigurableApplicationContext) line: 386    
        SpringApplication.run(String...) line: 327    
        SpringApplication.run(Class<?>[], String[]) line: 1245    
        SpringApplication.run(Class<?>, String...) line: 1233    
        Application.main(String[]) line: 51    
    AnnotationConfigApplicationContext.finishBeanFactoryInitialization()用户级对象实例化后处理事件的调用,如触发KafkaListenerAnnotationBeanPostProcessor<K,V>.processListener调用,逻辑是注释@KafkaListener方法在ConcurrentMessageListenerContainer注册成监听事件,有接收到kafka消息时补调用。@KafkaListener方法封闭一个MethodKafkaListenerEndpoint对象并注册执行
    KafkaListenerAnnotationBeanPostProcessor.registerEndpoint(endpoint, factory),注册逻辑MethodKafkaListenerEndpoint.createMessageListener()生成MessagingMessageListenerAdapter对象,factory对象是KafkaListenerContainerFactory可以生成MessageListenerContainer对象,MessagingMessageListenerAdapter对象通过MessageListenerContainer.setupMessageListener()注册。Container监听线程发现有消息时执行KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(),执行MessagingMessageListenerAdapter封装@KafkaListener注释方法。UML类图https://www.lucidchart.com/documents/view/d5e33d39-b5a7-4857-a2c6-60b4c7fd0139
    Thread [main] (Suspended (breakpoint at line 64 in MethodKafkaListenerEndpoint))    
        owns: ConcurrentHashMap<K,V>  (id=79)    
        owns: Object  (id=80)    
        MethodKafkaListenerEndpoint<K,V>.setBean(Object) line: 64    
        KafkaListenerAnnotationBeanPostProcessor<K,V>.processListener(MethodKafkaListenerEndpoint<?,?>, KafkaListener, Object, Object, String) line: 377    
        KafkaListenerAnnotationBeanPostProcessor<K,V>.processKafkaListener(KafkaListener, Method, Object, String) line: 340    
        KafkaListenerAnnotationBeanPostProcessor<K,V>.postProcessAfterInitialization(Object, String) line: 270    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).applyBeanPostProcessorsAfterInitialization(Object, String) line: 435    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).initializeBean(String, Object, RootBeanDefinition) line: 1721    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).doCreateBean(String, RootBeanDefinition, Object[]) line: 581    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).createBean(String, RootBeanDefinition, Object[]) line: 498    
        DefaultListableBeanFactory(AbstractBeanFactory).lambda$doGetBean$0(String, RootBeanDefinition, Object[]) line: 305    
        2099051403.getObject() line: not available    
        DefaultListableBeanFactory(DefaultSingletonBeanRegistry).getSingleton(String, ObjectFactory<?>) line: 233    
        DefaultListableBeanFactory(AbstractBeanFactory).doGetBean(String, Class<T>, Object[], boolean) line: 303    
        DefaultListableBeanFactory(AbstractBeanFactory).getBean(String, Class<T>) line: 198    
        CommonAnnotationBeanPostProcessor.autowireResource(BeanFactory, CommonAnnotationBeanPostProcessor$LookupElement, String) line: 512    
        CommonAnnotationBeanPostProcessor.getResource(CommonAnnotationBeanPostProcessor$LookupElement, String) line: 483    
        CommonAnnotationBeanPostProcessor$ResourceElement.getResourceToInject(Object, String) line: 617    
        CommonAnnotationBeanPostProcessor$ResourceElement(InjectionMetadata$InjectedElement).inject(Object, String, PropertyValues) line: 172    
        InjectionMetadata.inject(Object, String, PropertyValues) line: 89    
        CommonAnnotationBeanPostProcessor.postProcessPropertyValues(PropertyValues, PropertyDescriptor[], Object, String) line: 317    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).populateBean(String, RootBeanDefinition, BeanWrapper) line: 1353    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).doCreateBean(String, RootBeanDefinition, Object[]) line: 579    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).createBean(String, RootBeanDefinition, Object[]) line: 498    
        DefaultListableBeanFactory(AbstractBeanFactory).lambda$doGetBean$0(String, RootBeanDefinition, Object[]) line: 305    
        2099051403.getObject() line: not available    
        DefaultListableBeanFactory(DefaultSingletonBeanRegistry).getSingleton(String, ObjectFactory<?>) line: 233    
        DefaultListableBeanFactory(AbstractBeanFactory).doGetBean(String, Class<T>, Object[], boolean) line: 303    
        DefaultListableBeanFactory(AbstractBeanFactory).getBean(String, Class<T>) line: 198    
        CommonAnnotationBeanPostProcessor.autowireResource(BeanFactory, CommonAnnotationBeanPostProcessor$LookupElement, String) line: 512    
        CommonAnnotationBeanPostProcessor.getResource(CommonAnnotationBeanPostProcessor$LookupElement, String) line: 483    
        CommonAnnotationBeanPostProcessor$ResourceElement.getResourceToInject(Object, String) line: 617    
        CommonAnnotationBeanPostProcessor$ResourceElement(InjectionMetadata$InjectedElement).inject(Object, String, PropertyValues) line: 172    
        InjectionMetadata.inject(Object, String, PropertyValues) line: 89    
        CommonAnnotationBeanPostProcessor.postProcessPropertyValues(PropertyValues, PropertyDescriptor[], Object, String) line: 317    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).populateBean(String, RootBeanDefinition, BeanWrapper) line: 1353    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).doCreateBean(String, RootBeanDefinition, Object[]) line: 579    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).createBean(String, RootBeanDefinition, Object[]) line: 498    
        DefaultListableBeanFactory(AbstractBeanFactory).lambda$doGetBean$0(String, RootBeanDefinition, Object[]) line: 305    
        2099051403.getObject() line: not available    
        DefaultListableBeanFactory(DefaultSingletonBeanRegistry).getSingleton(String, ObjectFactory<?>) line: 233    
        DefaultListableBeanFactory(AbstractBeanFactory).doGetBean(String, Class<T>, Object[], boolean) line: 303    
        DefaultListableBeanFactory(AbstractBeanFactory).getBean(String) line: 193    
        DefaultListableBeanFactory.preInstantiateSingletons() line: 747    
        AnnotationConfigApplicationContext(AbstractApplicationContext).finishBeanFactoryInitialization(ConfigurableListableBeanFactory) line: 861    
        AnnotationConfigApplicationContext(AbstractApplicationContext).refresh() line: 542    
        SpringApplication.refresh(ApplicationContext) line: 750    
        SpringApplication.refreshContext(ConfigurableApplicationContext) line: 386    
        SpringApplication.run(String...) line: 327    
        SpringApplication.run(Class<?>[], String[]) line: 1245    
        SpringApplication.run(Class<?>, String...) line: 1233    
        Application.main(String[]) line: 51    
     
    Kafka客户端应用程序加载spring成功后,启动ConcurrentMessageListenerContainer监听线程读取客户端offset如果是设置了事务,就开始事务执行KafkaTransactionManager.doBegin() stack
    Thread [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] (Suspended (breakpoint at line 118 in KafkaTransactionManager))    
        KafkaTransactionManager<K,V>.doBegin(Object, TransactionDefinition) line: 118    
        KafkaTransactionManager<K,V>(AbstractPlatformTransactionManager).getTransaction(TransactionDefinition) line: 374    
        TransactionTemplate.execute(TransactionCallback<T>) line: 130    
        KafkaMessageListenerContainer$ListenerConsumer$1.onPartitionsAssigned(Collection<TopicPartition>) line: 434    
        ConsumerCoordinator.onJoinComplete(int, String, String, ByteBuffer) line: 265    
        ConsumerCoordinator(AbstractCoordinator).joinGroupIfNeeded() line: 363    
        ConsumerCoordinator(AbstractCoordinator).ensureActiveGroup() line: 310    
        ConsumerCoordinator.poll(long, long) line: 297    
        KafkaConsumer<K,V>.pollOnce(long) line: 1078    
        KafkaConsumer<K,V>.poll(long) line: 1043    
        KafkaMessageListenerContainer$ListenerConsumer.run() line: 546    
        Executors$RunnableAdapter<T>.call() line: 511    
        ListenableFutureTask<T>(FutureTask<V>).run() line: 266    
        Thread.run() line: 745  
    监听线程读取客户端offset,执行producer.sendOffsetsToTransaction() stack
    Thread [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] (Suspended (breakpoint at line 278 in DefaultKafkaProducerFactory$CloseSafeProducer))    
        DefaultKafkaProducerFactory$CloseSafeProducer<K,V>.sendOffsetsToTransaction(Map<TopicPartition,OffsetAndMetadata>, String) line: 278    
        KafkaMessageListenerContainer$ListenerConsumer$1$1.doInTransactionWithoutResult(TransactionStatus) line: 441    
        KafkaMessageListenerContainer$ListenerConsumer$1$1(TransactionCallbackWithoutResult).doInTransaction(TransactionStatus) line: 34    
        TransactionTemplate.execute(TransactionCallback<T>) line: 133    
        KafkaMessageListenerContainer$ListenerConsumer$1.onPartitionsAssigned(Collection<TopicPartition>) line: 434    
        ConsumerCoordinator.onJoinComplete(int, String, String, ByteBuffer) line: 265    
        ConsumerCoordinator(AbstractCoordinator).joinGroupIfNeeded() line: 363    
        ConsumerCoordinator(AbstractCoordinator).ensureActiveGroup() line: 310    
        ConsumerCoordinator.poll(long, long) line: 297    
        KafkaConsumer<K,V>.pollOnce(long) line: 1078    
        KafkaConsumer<K,V>.poll(long) line: 1043    
        KafkaMessageListenerContainer$ListenerConsumer.run() line: 546    
        Executors$RunnableAdapter<T>.call() line: 511    
        ListenableFutureTask<T>(FutureTask<V>).run() line: 266    
        Thread.run() line: 745    
    如果客户端offset未处理则开始事务,KafkaTransactionManager.doBegin() 
    Thread [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] (Suspended (breakpoint at line 118 in KafkaTransactionManager))    
        KafkaTransactionManager<K,V>.doBegin(Object, TransactionDefinition) line: 118    
        KafkaTransactionManager<K,V>(AbstractPlatformTransactionManager).getTransaction(TransactionDefinition) line: 374    
        TransactionTemplate.execute(TransactionCallback<T>) line: 130    
        KafkaMessageListenerContainer$ListenerConsumer.innvokeRecordListenerInTx(ConsumerRecords<K,V>) line: 807    
        KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(ConsumerRecords<K,V>) line: 787    
        KafkaMessageListenerContainer$ListenerConsumer.invokeListener(ConsumerRecords<K,V>) line: 666    
        KafkaMessageListenerContainer$ListenerConsumer.run() line: 554    
        Executors$RunnableAdapter<T>.call() line: 511    
        ListenableFutureTask<T>(FutureTask<V>).run() line: 266    
        Thread.run() line: 745    
    调用@KafkaListener注释方法
    Thread [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] (Suspended (breakpoint at line 75 in UserServiceImpl))    
        UserServiceImpl.receivekafka(TransactionMessage) line: 75    
        UserServiceImpl$$FastClassBySpringCGLIB$$d36e166a.invoke(int, Object, Object[]) line: not available    
        MethodProxy.invoke(Object, Object[]) line: 204    
        CglibAopProxy$CglibMethodInvocation.invokeJoinpoint() line: 738    
        CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 160    
        680055099.proceedWithInvocation() line: not available    
        TransactionInterceptor(TransactionAspectSupport).invokeWithinTransaction(Method, Class<?>, InvocationCallback) line: 288    
        TransactionInterceptor.invoke(MethodInvocation) line: 96    
        CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 182    
        CglibAopProxy$DynamicAdvisedInterceptor.intercept(Object, Method, Object[], MethodProxy) line: 680    
        UserServiceImpl$$EnhancerBySpringCGLIB$$af9d8b47.receivekafka(TransactionMessage) line: not available    
        NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]    
        NativeMethodAccessorImpl.invoke(Object, Object[]) line: 62    
        DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 43    
        Method.invoke(Object, Object...) line: 498    
        InvocableHandlerMethod.doInvoke(Object...) line: 181    
        InvocableHandlerMethod.invoke(Message<?>, Object...) line: 114    
        HandlerAdapter.invoke(Message<?>, Object...) line: 48    
        RecordMessagingMessageListenerAdapter<K,V>(MessagingMessageListenerAdapter<K,V>).invokeHandler(Object, Acknowledgment, Message<?>, Consumer<?,?>) line: 236    
        RecordMessagingMessageListenerAdapter<K,V>.onMessage(ConsumerRecord<K,V>, Acknowledgment, Consumer<?,?>) line: 80    
        RecordMessagingMessageListenerAdapter<K,V>.onMessage(Object, Acknowledgment, Consumer) line: 51    
        KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(ConsumerRecord<K,V>, Producer) line: 856    
        KafkaMessageListenerContainer$ListenerConsumer.access$1700(KafkaMessageListenerContainer$ListenerConsumer, ConsumerRecord, Producer) line: 240    
        KafkaMessageListenerContainer$ListenerConsumer$4.doInTransactionWithoutResult(TransactionStatus) line: 816    
        KafkaMessageListenerContainer$ListenerConsumer$4(TransactionCallbackWithoutResult).doInTransaction(TransactionStatus) line: 34    
        TransactionTemplate.execute(TransactionCallback<T>) line: 133    
        KafkaMessageListenerContainer$ListenerConsumer.innvokeRecordListenerInTx(ConsumerRecords<K,V>) line: 807    
        KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(ConsumerRecords<K,V>) line: 787    
        KafkaMessageListenerContainer$ListenerConsumer.invokeListener(ConsumerRecords<K,V>) line: 666    
        KafkaMessageListenerContainer$ListenerConsumer.run() line: 554    
        Executors$RunnableAdapter<T>.call() line: 511    
        ListenableFutureTask<T>(FutureTask<V>).run() line: 266    
        Thread.run() line: 745
    @KafkaListener注释方法执行成功后执行producer.sendOffsetsToTransaction() stack
    Thread [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] (Suspended (breakpoint at line 278 in DefaultKafkaProducerFactory$CloseSafeProducer))    
        DefaultKafkaProducerFactory$CloseSafeProducer<K,V>.sendOffsetsToTransaction(Map<TopicPartition,OffsetAndMetadata>, String) line: 278    
        KafkaMessageListenerContainer$ListenerConsumer.sendOffsetsToTransaction(Producer) line: 915    
        KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(ConsumerRecord<K,V>, Producer) line: 878    
        KafkaMessageListenerContainer$ListenerConsumer.access$1700(KafkaMessageListenerContainer$ListenerConsumer, ConsumerRecord, Producer) line: 240    
        KafkaMessageListenerContainer$ListenerConsumer$4.doInTransactionWithoutResult(TransactionStatus) line: 816    
        KafkaMessageListenerContainer$ListenerConsumer$4(TransactionCallbackWithoutResult).doInTransaction(TransactionStatus) line: 34    
        TransactionTemplate.execute(TransactionCallback<T>) line: 133    
        KafkaMessageListenerContainer$ListenerConsumer.innvokeRecordListenerInTx(ConsumerRecords<K,V>) line: 807    
        KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(ConsumerRecords<K,V>) line: 787    
        KafkaMessageListenerContainer$ListenerConsumer.invokeListener(ConsumerRecords<K,V>) line: 666    
        KafkaMessageListenerContainer$ListenerConsumer.run() line: 554    
        Executors$RunnableAdapter<T>.call() line: 511    
        ListenableFutureTask<T>(FutureTask<V>).run() line: 266    
        Thread.run() line: 745    
  • 相关阅读:
    c:forTokens标签循环输出
    jsp转long类型为date,并且格式化
    spring中@Param和mybatis中@Param使用区别(暂时还没接触)
    734. Sentence Similarity 有字典数组的相似句子
    246. Strobogrammatic Number 上下对称的数字
    720. Longest Word in Dictionary 能连续拼接出来的最长单词
    599. Minimum Index Sum of Two Lists两个餐厅列表的索引和最小
    594. Longest Harmonious Subsequence强制差距为1的最长连续
    645. Set Mismatch挑出不匹配的元素和应该真正存在的元素
    409. Longest Palindrome 最长对称串
  • 原文地址:https://www.cnblogs.com/birdstudio/p/7640095.html
Copyright © 2011-2022 走看看