zoukankan      html  css  js  c++  java
  • Spring Kafka(四)使用Kafka事务的两种方式

    为什么要使用Kafka事务

    在日常开发中,数据库的事务几乎是必须用到的,事务回滚不一定在于数据增删改异常,可能系统出现特定逻辑判断的时候也需要进行数据回滚,Kafka亦是如此,

    我们并不希望消息监听器接收到一些错误的或者不需要的消息。

    SpringBoot使用数据库事务非常简单,只需要在方法上加上@Transactional注解即可,那Kafka如果需要使用事务也可以如此,不过还需修改其他配置才能生效。

    Kafka使用事务的两种方式

    配置Kafka事务管理器并使用@Transactional注解

    使用KafkaTemplate的executeInTransaction方法

    使用@Transactional注解方式

    使用注解方式开启事务还是比较方便的,不过首先需要我们配置KafkaTransactionManager,这个类就是Kafka提供给我们的事务管理类,

    我们需要使用生产者工厂来创建这个事务管理类。

    需要注意的是,我们需要在producerFactory中开启事务功能,并设置TransactionIdPrefix,TransactionIdPrefix是用来生成Transactional.id的前缀。

    @Bean
        public ProducerFactory<Integer, String> producerFactory() {
            DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(senderProps());
            factory.transactionCapable();
            factory.setTransactionIdPrefix("tran-");
            return factory;
        }
    
        @Bean
        public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {
            KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory);
            return manager;
        }
    View Code

    配置Kafka事务还是非常简单的,接下来我们测试一下事务是否能正常使用,在DemoTest类中创建该方法

        @Test
        @Transactional
        public void testTransactionalAnnotation() throws InterruptedException {
            kafkaTemplate.send("topic.quick.tran", "test transactional annotation");
            throw new RuntimeException("fail");
        }
    View Code

    运行测试方法后我们可以看到控制台中输出了如下日志,这就代表我们使用事务成功,

    或者你可以打开Kafka Tool 2工具查看一下测试前后的数据是否有变化

    org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:317) [kafka-clients-1.0.2.jar:na]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:214) [kafka-clients-1.0.2.jar:na]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) [kafka-clients-1.0.2.jar:na]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
    View Code

    使用KafkaTemplate.executeInTransaction开启事务

    这种方式开启事务是不需要配置事务管理器的,也可以称为本地事务。直接编写测试方法

       @Test
        public void testExecuteInTransaction() throws InterruptedException {
            kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
                @Override
                public Object doInOperations(KafkaOperations kafkaOperations) {
                    kafkaOperations.send("topic.quick.tran", "test executeInTransaction");
                    throw new RuntimeException("fail");
                    //return true;
                }
            });
        }
    View Code

    运行测试方法后控制台同样打印出了事务终止的异常,代表可以正常使用事务

    org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:317) [kafka-clients-1.0.2.jar:na]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:214) [kafka-clients-1.0.2.jar:na]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) [kafka-clients-1.0.2.jar:na]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
    View Code
  • 相关阅读:
    POJ 1795 DNA Laboratory
    CodeForces 303B Rectangle Puzzle II
    HDU 2197 本源串
    HDU 5965 扫雷
    POJ 3099 Go Go Gorelians
    CodeForces 762D Maximum path
    CodeForces 731C Socks
    HDU 1231 最大连续子序列
    HDU 5650 so easy
    大话接口隐私与安全 转载
  • 原文地址:https://www.cnblogs.com/yanliang12138/p/12554756.html
Copyright © 2011-2022 走看看