zoukankan      html  css  js  c++  java
  • Kafka提升——事务

    生产者事务

    Exactly Once 语义

    At Least Once 语义

    至少发送一次,当生产者ack设置为-1的时候(在发送message后,leader和follower数据全部落盘成功以后,返回ack。但是在follower全部同步完成未完成或已完成,broker发送ack之前,leader发生故障,此时生产者会开始重试message发送,此时会造成数据重复

    At Most Once 语义

    将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被发送一次(生产者只管发送,不理会broker的ack)


    ** Exactly Once 语义**
    At Least Once 可以保证数据不丢失,但是不能保证数据不重复;相对的,At Most Once可以保证数据不重复,但是不能保证数据不丢失。对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once 语义。

    幂等性

    所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据,Server 端都只会持久化一条。幂等性结合 At Least Once 语义,就构成了 Kafka 的 Exactly Once 语义。即:
    At Least Once + 幂等性 = Exactly Once

    要启用幂等性,只需要将 Producer 的参数中 enable.idompotence 设置为 true 即可。Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。
    但是 PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。

    Kafka生产者事务

    Kafka 从 0.11 版本开始引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。
    为了实现跨分区跨会话的事务,需要引入一个全局唯一的 Transaction ID,并将 Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID 获得原来的 PID。

    例:
    当生产者需要批量发送一批数据<a,b,c,d>,当发送到c时,服务器发生故障,生产者宕机,此时,a,b不会被发送。也就是说保证了操作的原子性,要么全部成功,要么全部失败。

    假设,a,b,c,d数据全部发送成功,此时broker尚未返回ack,leader和follower落盘成功,broker宕机,生产者没有收到ack,进行消息重发,此时broker回复,因为开启了事务,在生产者发送消息时会携带事务ID,通过事务id获取到pid,从而发现数据已落盘成功,此时数据不进行保存,返回ack。

    SpringBoot整合事务操作

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
                    <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
                   <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
                <version>2.8.2</version>
            </dependency>
    

    生产者配置类 : KafkaTransactionalProducerConfig

    @Configuration
    public class KafkaTransactionalProducerConfig {
    
        @Value("${kafka.bootstrap-servers}")
        private String bootstrapServers;
    
    
        /**
         * 生产者配置
         * @return
         */
        private Map<String, Object> configs() {
            Map<String, Object> props = new HashMap<>();
            // 连接地址
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            // 键的序列化方式
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            // 值的序列化方式
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            // 重试,0为不启用重试机制
            props.put(ProducerConfig.RETRIES_CONFIG, 1);
            // 控制批处理大小,单位为字节
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
            // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
            props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
            // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);
            //ack应答,事务开启默认为all,若手动设置为其他会报错
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            return props;
        }
    
        @Bean
        public ProducerFactory<String, String> transactionalProducerFactory() {
            DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(configs());
            // 开启事务
            factory.transactionCapable();
            // 用来生成Transactional.id的前缀
            factory.setTransactionIdPrefix("tran-");
            return factory;
        }
    
        /**
         * 事务管理器
         * @param transactionalProducerFactory
         * @return
         */
        @Bean
        public KafkaTransactionManager transactionManager(ProducerFactory transactionalProducerFactory) {
            KafkaTransactionManager manager = new KafkaTransactionManager(transactionalProducerFactory);
            return manager;
        }
    
        @Bean("transactionalTemplate") //配置多个类型的template的时候,主要注解primary
        public KafkaTemplate<String, String> transactionalTemplate() {
            KafkaTemplate template = new KafkaTemplate<String, String>(transactionalProducerFactory());
            return template;
        }
    
    }
    

    消费者配置类: KafkaConsumerConfig

    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
    
        @Value("${kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        /**
         * 单线程-单条消费
         * @return
         */
        @Bean
        public KafkaListenerContainerFactory<?> stringKafkaListenerContainerFactory() {
            Map<String, Object> configProps = new HashMap<>();
            configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "topic");
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
            return factory;
        }
    }
    

    消息发送 : KafkaTestController

    public class KafkaTestController {
    
    	private static Logger logger = LoggerFactory.getLogger(KafkaTestController.class);
    
    	@Autowired
    	@Qualifier("transactionalTemplate")
    	private KafkaTemplate<String, String> transactionalTemplate;
    
    	private Gson gson = new GsonBuilder().create();
    	
    	@RequestMapping("/testSendMsg4")
    	@ResponseBody
    	@Transactional
    	public String testSendMsg5(){ //事务发送
    		Message message = new Message();
    		message.setId(1);
    		message.setMsg("testSendMsg4");
    		message.setSendTime(new Date());
    		logger.info("发送消息(事务发送) ----->>>>>  message = {}", gson.toJson(message));
    		transactionalTemplate.send("hello", gson.toJson(message)).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
    			@Override
    			public void onFailure(Throwable throwable) {
    				logger.error("发生错误,消息发送失败!");
    			}
    
    			@Override
    			public void onSuccess(SendResult<String, String> stringStringSendResult) {
    				logger.error("消息发送成功!");
    			}
    		});
    		int i = 1/0;
    		return "testSendMsg5";
    	}
    	
    }
    

    消息监听:KafkaReceiver

    @Component
    public class KafkaReceiver {
    	
    	private static Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);
    
    	@KafkaListener(topics = "hello", containerFactory = "stringKafkaListenerContainerFactory")
    	public void receiveString(String message) {
    		logger.info("Message : %s" +message);
    	}
    }
    

    结果 :

    由此可见,在当前的发送过程中,由于发生错误,消息并没有被消费者消费掉,保证了该操作的原子性。

  • 相关阅读:
    麦子学院
    iOS开发系列--地图与定位
    iOS开发多线程篇—线程间的通信
    iOS开发多线程篇—创建线程
    iOS开发多线程篇—多线程简单介绍
    iOS开发网络篇—NSURLConnection基本使用
    iOS开发网络篇—GET请求和POST请求
    iOS开发网络篇—HTTP协议
    用CocoaPods做iOS程序的依赖管理
    新手不了解Xcode和mac系统可能犯得错误和我的建议
  • 原文地址:https://www.cnblogs.com/luckyhui28/p/12518661.html
Copyright © 2011-2022 走看看