zoukankan      html  css  js  c++  java
  • Spring-framework应用程序启动loadtime源码分析笔记(三)——@KafkaListener

    org.springframework.context.annotation.ConfigurationClassParser.getConfigurationClasses()读所有@Configuration类,传入org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions()进行@Configuration对象定义加载,加载@Configuration类定义@Bean方法、@Import等资源,如使用@EnableKafka加载KafkaBootstrapConfiguration然后加载KafkaListenerAnnotationBeanPostProcessor如下列stack所示:
    Thread [main] (Suspended (breakpoint at line 188 in ConfigurationClass))    
        owns: Object  (id=120)    
        ConfigurationClass.addBeanMethod(BeanMethod) line: 188    
        ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClass, ConfigurationClassParser$SourceClass) line: 316    
        ConfigurationClassParser.processConfigurationClass(ConfigurationClass) line: 244    
        ConfigurationClassParser.processImports(ConfigurationClass, SourceClass, Collection<SourceClass>, boolean) line: 606    
        ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClass, ConfigurationClassParser$SourceClass) line: 299    
        ConfigurationClassParser.processConfigurationClass(ConfigurationClass) line: 244    
        ConfigurationClassParser.parse(String, String) line: 189    
        ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClass, ConfigurationClassParser$SourceClass) line: 292    
        ConfigurationClassParser.processConfigurationClass(ConfigurationClass) line: 244    
        ConfigurationClassParser.parse(AnnotationMetadata, String) line: 197    
        ConfigurationClassParser.parse(Set<BeanDefinitionHolder>) line: 165    
        ConfigurationClassPostProcessor.processConfigBeanDefinitions(BeanDefinitionRegistry) line: 308    
        ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(BeanDefinitionRegistry) line: 229    
        PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(Collection<BeanDefinitionRegistryPostProcessor>, BeanDefinitionRegistry) line: 271    
        PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(ConfigurableListableBeanFactory, List<BeanFactoryPostProcessor>) line: 94    
        AnnotationConfigApplicationContext(AbstractApplicationContext).invokeBeanFactoryPostProcessors(ConfigurableListableBeanFactory) line: 686    
        AnnotationConfigApplicationContext(AbstractApplicationContext).refresh() line: 524    
        SpringApplication.refresh(ApplicationContext) line: 750    
        SpringApplication.refreshContext(ConfigurableApplicationContext) line: 386    
        SpringApplication.run(String...) line: 327    
        SpringApplication.run(Class<?>[], String[]) line: 1245    
        SpringApplication.run(Class<?>, String...) line: 1233    
        Application.main(String[]) line: 51    
    KafkaListenerAnnotationBeanPostProcessor对象定义注册在DefaultListableBeanFactory.registerBeanDefinition(),使用名称为KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,如下列stack:
    Thread [main] (Suspended (breakpoint at line 779 in DefaultListableBeanFactory))    
        owns: Object  (id=130)    
        DefaultListableBeanFactory.registerBeanDefinition(String, BeanDefinition) line: 779    
        ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForBeanMethod(BeanMethod) line: 262    
        ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(ConfigurationClass, ConfigurationClassBeanDefinitionReader$TrackedConditionEvaluator) line: 141 
        ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(Set<ConfigurationClass>) line: 117    
        ConfigurationClassPostProcessor.processConfigBeanDefinitions(BeanDefinitionRegistry) line: 320    
        ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(BeanDefinitionRegistry) line: 229    
        PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(Collection<BeanDefinitionRegistryPostProcessor>, BeanDefinitionRegistry) line: 271    
        PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(ConfigurableListableBeanFactory, List<BeanFactoryPostProcessor>) line: 94    
        AnnotationConfigApplicationContext(AbstractApplicationContext).invokeBeanFactoryPostProcessors(ConfigurableListableBeanFactory) line: 686    
        AnnotationConfigApplicationContext(AbstractApplicationContext).refresh() line: 524    
        SpringApplication.refresh(ApplicationContext) line: 750    
        SpringApplication.refreshContext(ConfigurableApplicationContext) line: 386    
        SpringApplication.run(String...) line: 327    
        SpringApplication.run(Class<?>[], String[]) line: 1245    
        SpringApplication.run(Class<?>, String...) line: 1233    
        Application.main(String[]) line: 51    
    KafkaListenerAnnotationBeanPostProcessor对象定义注册后,在PostProcessorRegistrationDelegate.registerBeanPostProcessors()实例化,将KafkaListenerAnnotationBeanPostProcessor对象添加到DefaultListableBeanFactory.beanPostProcessors保存,用于后续注释了@KafkaListener的对象实例化后处理事件的调用。KafkaListenerAnnotationBeanPostProcessor在AnnotationConfigApplicationContext(AbstractApplicationContext).registerBeanPostProcessors(beanFactory)注册,如下图stack:
     
    Thread [main] (Suspended (breakpoint at line 848 in AbstractBeanFactory))    
        owns: Object  (id=361)    
        DefaultListableBeanFactory(AbstractBeanFactory).addBeanPostProcessor(BeanPostProcessor) line: 848    
        PostProcessorRegistrationDelegate.registerBeanPostProcessors(ConfigurableListableBeanFactory, List<BeanPostProcessor>) line: 293    
        PostProcessorRegistrationDelegate.registerBeanPostProcessors(ConfigurableListableBeanFactory, AbstractApplicationContext) line: 231    
        AnnotationConfigApplicationContext(AbstractApplicationContext).registerBeanPostProcessors(ConfigurableListableBeanFactory) line: 702    
        AnnotationConfigApplicationContext(AbstractApplicationContext).refresh() line: 527    
        SpringApplication.refresh(ApplicationContext) line: 750    
        SpringApplication.refreshContext(ConfigurableApplicationContext) line: 386    
        SpringApplication.run(String...) line: 327    
        SpringApplication.run(Class<?>[], String[]) line: 1245    
        SpringApplication.run(Class<?>, String...) line: 1233    
        Application.main(String[]) line: 51    
    AnnotationConfigApplicationContext.finishBeanFactoryInitialization()用户级对象实例化后处理事件的调用,如触发KafkaListenerAnnotationBeanPostProcessor<K,V>.processListener调用,逻辑是注释@KafkaListener方法在ConcurrentMessageListenerContainer注册成监听事件,有接收到kafka消息时补调用。@KafkaListener方法封闭一个MethodKafkaListenerEndpoint对象并注册执行
    KafkaListenerAnnotationBeanPostProcessor.registerEndpoint(endpoint, factory),注册逻辑MethodKafkaListenerEndpoint.createMessageListener()生成MessagingMessageListenerAdapter对象,factory对象是KafkaListenerContainerFactory可以生成MessageListenerContainer对象,MessagingMessageListenerAdapter对象通过MessageListenerContainer.setupMessageListener()注册。Container监听线程发现有消息时执行KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(),执行MessagingMessageListenerAdapter封装@KafkaListener注释方法。UML类图https://www.lucidchart.com/documents/view/d5e33d39-b5a7-4857-a2c6-60b4c7fd0139
    Thread [main] (Suspended (breakpoint at line 64 in MethodKafkaListenerEndpoint))    
        owns: ConcurrentHashMap<K,V>  (id=79)    
        owns: Object  (id=80)    
        MethodKafkaListenerEndpoint<K,V>.setBean(Object) line: 64    
        KafkaListenerAnnotationBeanPostProcessor<K,V>.processListener(MethodKafkaListenerEndpoint<?,?>, KafkaListener, Object, Object, String) line: 377    
        KafkaListenerAnnotationBeanPostProcessor<K,V>.processKafkaListener(KafkaListener, Method, Object, String) line: 340    
        KafkaListenerAnnotationBeanPostProcessor<K,V>.postProcessAfterInitialization(Object, String) line: 270    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).applyBeanPostProcessorsAfterInitialization(Object, String) line: 435    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).initializeBean(String, Object, RootBeanDefinition) line: 1721    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).doCreateBean(String, RootBeanDefinition, Object[]) line: 581    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).createBean(String, RootBeanDefinition, Object[]) line: 498    
        DefaultListableBeanFactory(AbstractBeanFactory).lambda$doGetBean$0(String, RootBeanDefinition, Object[]) line: 305    
        2099051403.getObject() line: not available    
        DefaultListableBeanFactory(DefaultSingletonBeanRegistry).getSingleton(String, ObjectFactory<?>) line: 233    
        DefaultListableBeanFactory(AbstractBeanFactory).doGetBean(String, Class<T>, Object[], boolean) line: 303    
        DefaultListableBeanFactory(AbstractBeanFactory).getBean(String, Class<T>) line: 198    
        CommonAnnotationBeanPostProcessor.autowireResource(BeanFactory, CommonAnnotationBeanPostProcessor$LookupElement, String) line: 512    
        CommonAnnotationBeanPostProcessor.getResource(CommonAnnotationBeanPostProcessor$LookupElement, String) line: 483    
        CommonAnnotationBeanPostProcessor$ResourceElement.getResourceToInject(Object, String) line: 617    
        CommonAnnotationBeanPostProcessor$ResourceElement(InjectionMetadata$InjectedElement).inject(Object, String, PropertyValues) line: 172    
        InjectionMetadata.inject(Object, String, PropertyValues) line: 89    
        CommonAnnotationBeanPostProcessor.postProcessPropertyValues(PropertyValues, PropertyDescriptor[], Object, String) line: 317    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).populateBean(String, RootBeanDefinition, BeanWrapper) line: 1353    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).doCreateBean(String, RootBeanDefinition, Object[]) line: 579    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).createBean(String, RootBeanDefinition, Object[]) line: 498    
        DefaultListableBeanFactory(AbstractBeanFactory).lambda$doGetBean$0(String, RootBeanDefinition, Object[]) line: 305    
        2099051403.getObject() line: not available    
        DefaultListableBeanFactory(DefaultSingletonBeanRegistry).getSingleton(String, ObjectFactory<?>) line: 233    
        DefaultListableBeanFactory(AbstractBeanFactory).doGetBean(String, Class<T>, Object[], boolean) line: 303    
        DefaultListableBeanFactory(AbstractBeanFactory).getBean(String, Class<T>) line: 198    
        CommonAnnotationBeanPostProcessor.autowireResource(BeanFactory, CommonAnnotationBeanPostProcessor$LookupElement, String) line: 512    
        CommonAnnotationBeanPostProcessor.getResource(CommonAnnotationBeanPostProcessor$LookupElement, String) line: 483    
        CommonAnnotationBeanPostProcessor$ResourceElement.getResourceToInject(Object, String) line: 617    
        CommonAnnotationBeanPostProcessor$ResourceElement(InjectionMetadata$InjectedElement).inject(Object, String, PropertyValues) line: 172    
        InjectionMetadata.inject(Object, String, PropertyValues) line: 89    
        CommonAnnotationBeanPostProcessor.postProcessPropertyValues(PropertyValues, PropertyDescriptor[], Object, String) line: 317    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).populateBean(String, RootBeanDefinition, BeanWrapper) line: 1353    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).doCreateBean(String, RootBeanDefinition, Object[]) line: 579    
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).createBean(String, RootBeanDefinition, Object[]) line: 498    
        DefaultListableBeanFactory(AbstractBeanFactory).lambda$doGetBean$0(String, RootBeanDefinition, Object[]) line: 305    
        2099051403.getObject() line: not available    
        DefaultListableBeanFactory(DefaultSingletonBeanRegistry).getSingleton(String, ObjectFactory<?>) line: 233    
        DefaultListableBeanFactory(AbstractBeanFactory).doGetBean(String, Class<T>, Object[], boolean) line: 303    
        DefaultListableBeanFactory(AbstractBeanFactory).getBean(String) line: 193    
        DefaultListableBeanFactory.preInstantiateSingletons() line: 747    
        AnnotationConfigApplicationContext(AbstractApplicationContext).finishBeanFactoryInitialization(ConfigurableListableBeanFactory) line: 861    
        AnnotationConfigApplicationContext(AbstractApplicationContext).refresh() line: 542    
        SpringApplication.refresh(ApplicationContext) line: 750    
        SpringApplication.refreshContext(ConfigurableApplicationContext) line: 386    
        SpringApplication.run(String...) line: 327    
        SpringApplication.run(Class<?>[], String[]) line: 1245    
        SpringApplication.run(Class<?>, String...) line: 1233    
        Application.main(String[]) line: 51    
     
    Kafka客户端应用程序加载spring成功后,启动ConcurrentMessageListenerContainer监听线程读取客户端offset如果是设置了事务,就开始事务执行KafkaTransactionManager.doBegin() stack
    Thread [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] (Suspended (breakpoint at line 118 in KafkaTransactionManager))    
        KafkaTransactionManager<K,V>.doBegin(Object, TransactionDefinition) line: 118    
        KafkaTransactionManager<K,V>(AbstractPlatformTransactionManager).getTransaction(TransactionDefinition) line: 374    
        TransactionTemplate.execute(TransactionCallback<T>) line: 130    
        KafkaMessageListenerContainer$ListenerConsumer$1.onPartitionsAssigned(Collection<TopicPartition>) line: 434    
        ConsumerCoordinator.onJoinComplete(int, String, String, ByteBuffer) line: 265    
        ConsumerCoordinator(AbstractCoordinator).joinGroupIfNeeded() line: 363    
        ConsumerCoordinator(AbstractCoordinator).ensureActiveGroup() line: 310    
        ConsumerCoordinator.poll(long, long) line: 297    
        KafkaConsumer<K,V>.pollOnce(long) line: 1078    
        KafkaConsumer<K,V>.poll(long) line: 1043    
        KafkaMessageListenerContainer$ListenerConsumer.run() line: 546    
        Executors$RunnableAdapter<T>.call() line: 511    
        ListenableFutureTask<T>(FutureTask<V>).run() line: 266    
        Thread.run() line: 745  
    监听线程读取客户端offset,执行producer.sendOffsetsToTransaction() stack
    Thread [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] (Suspended (breakpoint at line 278 in DefaultKafkaProducerFactory$CloseSafeProducer))    
        DefaultKafkaProducerFactory$CloseSafeProducer<K,V>.sendOffsetsToTransaction(Map<TopicPartition,OffsetAndMetadata>, String) line: 278    
        KafkaMessageListenerContainer$ListenerConsumer$1$1.doInTransactionWithoutResult(TransactionStatus) line: 441    
        KafkaMessageListenerContainer$ListenerConsumer$1$1(TransactionCallbackWithoutResult).doInTransaction(TransactionStatus) line: 34    
        TransactionTemplate.execute(TransactionCallback<T>) line: 133    
        KafkaMessageListenerContainer$ListenerConsumer$1.onPartitionsAssigned(Collection<TopicPartition>) line: 434    
        ConsumerCoordinator.onJoinComplete(int, String, String, ByteBuffer) line: 265    
        ConsumerCoordinator(AbstractCoordinator).joinGroupIfNeeded() line: 363    
        ConsumerCoordinator(AbstractCoordinator).ensureActiveGroup() line: 310    
        ConsumerCoordinator.poll(long, long) line: 297    
        KafkaConsumer<K,V>.pollOnce(long) line: 1078    
        KafkaConsumer<K,V>.poll(long) line: 1043    
        KafkaMessageListenerContainer$ListenerConsumer.run() line: 546    
        Executors$RunnableAdapter<T>.call() line: 511    
        ListenableFutureTask<T>(FutureTask<V>).run() line: 266    
        Thread.run() line: 745    
    如果客户端offset未处理则开始事务,KafkaTransactionManager.doBegin() 
    Thread [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] (Suspended (breakpoint at line 118 in KafkaTransactionManager))    
        KafkaTransactionManager<K,V>.doBegin(Object, TransactionDefinition) line: 118    
        KafkaTransactionManager<K,V>(AbstractPlatformTransactionManager).getTransaction(TransactionDefinition) line: 374    
        TransactionTemplate.execute(TransactionCallback<T>) line: 130    
        KafkaMessageListenerContainer$ListenerConsumer.innvokeRecordListenerInTx(ConsumerRecords<K,V>) line: 807    
        KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(ConsumerRecords<K,V>) line: 787    
        KafkaMessageListenerContainer$ListenerConsumer.invokeListener(ConsumerRecords<K,V>) line: 666    
        KafkaMessageListenerContainer$ListenerConsumer.run() line: 554    
        Executors$RunnableAdapter<T>.call() line: 511    
        ListenableFutureTask<T>(FutureTask<V>).run() line: 266    
        Thread.run() line: 745    
    调用@KafkaListener注释方法
    Thread [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] (Suspended (breakpoint at line 75 in UserServiceImpl))    
        UserServiceImpl.receivekafka(TransactionMessage) line: 75    
        UserServiceImpl$$FastClassBySpringCGLIB$$d36e166a.invoke(int, Object, Object[]) line: not available    
        MethodProxy.invoke(Object, Object[]) line: 204    
        CglibAopProxy$CglibMethodInvocation.invokeJoinpoint() line: 738    
        CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 160    
        680055099.proceedWithInvocation() line: not available    
        TransactionInterceptor(TransactionAspectSupport).invokeWithinTransaction(Method, Class<?>, InvocationCallback) line: 288    
        TransactionInterceptor.invoke(MethodInvocation) line: 96    
        CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 182    
        CglibAopProxy$DynamicAdvisedInterceptor.intercept(Object, Method, Object[], MethodProxy) line: 680    
        UserServiceImpl$$EnhancerBySpringCGLIB$$af9d8b47.receivekafka(TransactionMessage) line: not available    
        NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]    
        NativeMethodAccessorImpl.invoke(Object, Object[]) line: 62    
        DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 43    
        Method.invoke(Object, Object...) line: 498    
        InvocableHandlerMethod.doInvoke(Object...) line: 181    
        InvocableHandlerMethod.invoke(Message<?>, Object...) line: 114    
        HandlerAdapter.invoke(Message<?>, Object...) line: 48    
        RecordMessagingMessageListenerAdapter<K,V>(MessagingMessageListenerAdapter<K,V>).invokeHandler(Object, Acknowledgment, Message<?>, Consumer<?,?>) line: 236    
        RecordMessagingMessageListenerAdapter<K,V>.onMessage(ConsumerRecord<K,V>, Acknowledgment, Consumer<?,?>) line: 80    
        RecordMessagingMessageListenerAdapter<K,V>.onMessage(Object, Acknowledgment, Consumer) line: 51    
        KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(ConsumerRecord<K,V>, Producer) line: 856    
        KafkaMessageListenerContainer$ListenerConsumer.access$1700(KafkaMessageListenerContainer$ListenerConsumer, ConsumerRecord, Producer) line: 240    
        KafkaMessageListenerContainer$ListenerConsumer$4.doInTransactionWithoutResult(TransactionStatus) line: 816    
        KafkaMessageListenerContainer$ListenerConsumer$4(TransactionCallbackWithoutResult).doInTransaction(TransactionStatus) line: 34    
        TransactionTemplate.execute(TransactionCallback<T>) line: 133    
        KafkaMessageListenerContainer$ListenerConsumer.innvokeRecordListenerInTx(ConsumerRecords<K,V>) line: 807    
        KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(ConsumerRecords<K,V>) line: 787    
        KafkaMessageListenerContainer$ListenerConsumer.invokeListener(ConsumerRecords<K,V>) line: 666    
        KafkaMessageListenerContainer$ListenerConsumer.run() line: 554    
        Executors$RunnableAdapter<T>.call() line: 511    
        ListenableFutureTask<T>(FutureTask<V>).run() line: 266    
        Thread.run() line: 745
    @KafkaListener注释方法执行成功后执行producer.sendOffsetsToTransaction() stack
    Thread [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] (Suspended (breakpoint at line 278 in DefaultKafkaProducerFactory$CloseSafeProducer))    
        DefaultKafkaProducerFactory$CloseSafeProducer<K,V>.sendOffsetsToTransaction(Map<TopicPartition,OffsetAndMetadata>, String) line: 278    
        KafkaMessageListenerContainer$ListenerConsumer.sendOffsetsToTransaction(Producer) line: 915    
        KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(ConsumerRecord<K,V>, Producer) line: 878    
        KafkaMessageListenerContainer$ListenerConsumer.access$1700(KafkaMessageListenerContainer$ListenerConsumer, ConsumerRecord, Producer) line: 240    
        KafkaMessageListenerContainer$ListenerConsumer$4.doInTransactionWithoutResult(TransactionStatus) line: 816    
        KafkaMessageListenerContainer$ListenerConsumer$4(TransactionCallbackWithoutResult).doInTransaction(TransactionStatus) line: 34    
        TransactionTemplate.execute(TransactionCallback<T>) line: 133    
        KafkaMessageListenerContainer$ListenerConsumer.innvokeRecordListenerInTx(ConsumerRecords<K,V>) line: 807    
        KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(ConsumerRecords<K,V>) line: 787    
        KafkaMessageListenerContainer$ListenerConsumer.invokeListener(ConsumerRecords<K,V>) line: 666    
        KafkaMessageListenerContainer$ListenerConsumer.run() line: 554    
        Executors$RunnableAdapter<T>.call() line: 511    
        ListenableFutureTask<T>(FutureTask<V>).run() line: 266    
        Thread.run() line: 745    
  • 相关阅读:
    py2exe
    Python库
    Python正则表达式指南
    [Python]日志模块logging的应用
    [Python]python __init__.py
    如何安装配置ulipad
    [Python]如何快速知道要使用哪些python模块和有哪些功能
    CodeIgniter
    Python 绝对简明手册
    理解Python命名机制
  • 原文地址:https://www.cnblogs.com/birdstudio/p/7640095.html
Copyright © 2011-2022 走看看