zoukankan      html  css  js  c++  java
  • Kafka消费实现精确一次(转载)

    转载自:

    https://blog.csdn.net/qq_18581221/article/details/89766073

    https://www.cnblogs.com/yn-huang/p/11684688.html

    简介

    在使用kafka时,大多数场景对于数据少量的不一致(重复或者丢失)并不关注,比如日志,因为不会影响最终的使用或者分析,但是在某些应用场景(比如业务数据),需要对任何一条消息都要做到精确一次的消费,才能保证系统的正确性,kafka并不提供准确一致的消费API,需要我们在实际使用时借用外部的一些手段来保证消费的精确性,下面我们介绍如何实现

    kafka消费机制

    这篇文章KafkaConsumer使用介绍、参数配置介绍了如何kafka具有两种提交offset(消费偏移量)方式,我们在Kafka简介以及安装和使用可知每个分区具备一offset记录消费位置,如果消费者一直处于正常的运行转态,那么offset将没有什么用处,因为正常消费时,consumer记录了本次消费的offset和下一次将要进行poll数据的offset起始位置,但是如果消费者发生崩溃或者有新的消费者加入消费者组,就会触发再均衡Rebalance,Rebalance之后,每个消费者将会分配到新的分区,而消费者对于新的分区应该从哪里进行起始消费,这时候提交的offset信息就起作用了,提交的offset信息包括消费者组所有分区的消费进度,这时候消费者可以根据消费进度继续消费,提交offset提交自动提交是最不具确定性的,所以要使用手动提交来控制offset

    消费时出现几种异常情况

    自动提交

    • 重复消费:当数据已经被处理,然后自动提交offset时消费者出现故障或者有新消费者加入组导致再均衡,这时候offset提交失败,导致这批已经处理的数据的信息没有记录,后续会重复消费一次
    • 丢失数据:如果业务处理时间较长一点,这时候数据处理业务还未完成,offset信息已经提交了,但是在后续处理数据过程中程序发生了崩溃,导致这批数据未正常消费,这时候offset已经提交,消费者后续将不在消费这批数据,导致这批数据将会丢失

    手动提交

    • 重复消费(最少一次消费语义实现):消费数据处理业务完成后进行offset提交,可以保证数据最少一次消费,因为在提交offset的过程中可能出现提交失败的情况,导致数据重复消费
    /**
     * 手动提交offset
     * 实现至少一次的消费语义 at least once
     * 当手动提交位移失败,会重复消费数据
     */
    @Test
    public void testCommitOffset() {
        String topic = "first-topic";
        String group = "g1";
    
        Properties props = new Properties();
        props.put("bootstrap.servers", "node00:9092,node03:9092");   //required
        props.put("group.id", group);   //required
        props.put("enable.auto.commit", "false"); // 关闭自动提交
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "latest");     //从最早的消息开始读取
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  //required
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //required
    
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));       //订阅topic
        final int minBatchSize = 10;
        // 缓存
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>(minBatchSize);
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                records.forEach(buffer::add);
    
                // 缓存满了才对数据进行处理
                if (buffer.size() >= minBatchSize) {
    
                    // 业务逻辑--插入数据库
                    // insertIntoDb(buffer);
                    // 等数据插入数据库之后,再异步提交位移
    
                    // 通过异步的方式提交位移
                    consumer.commitAsync(((offsets, exception) -> {
                        if (exception == null) {
                            offsets.forEach((topicPartition, metadata) -> {
                                System.out.println(topicPartition + " -> offset=" + metadata.offset());
                            });
                        } else {
                            exception.printStackTrace();
                            // 如果出错了,同步提交位移
                            consumer.commitSync(offsets);
                        }
                    }));
    
                   
                    // 如果提交位移失败了,那么重启consumer后会重复消费之前的数据,再次插入到数据库中
                    // 清空缓冲区
                    buffer.clear();
                }
            }
        } finally {
            consumer.close();
        }
    }
    • 丢失数据(最多一次消费语义实现):在消费数据业务处理前进行offset提交,可以保证最多一次消费,在后续数据业务处理程序出现故障,将导致数据丢失

    代码实现

    /**
     * 实现最多一次语义
     * 在消费前提交位移,当后续业务出现异常时,可能丢失数据
     */
    @Test
    public void testAtMostOnce() {
        Properties props = new Properties();
        props.put("enable.auto.commit", "false");
        KafkaConsumer<String, String> kafkaConsumer = KafkaFactory.buildConsumer(props);
        kafkaConsumer.subscribe(Arrays.asList("first-topic"));
        try {
    
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(500);
                // 处理业务之前就提交位移
                kafkaConsumer.commitAsync();
                // 下面是业务逻辑
                records.forEach(record -> {
                    System.out.println(record.value() + ", offset=" + record.offset());
                });
            }
        } catch (Exception e) {
    
        } finally {
            kafkaConsumer.close();
        }
    
    }

    精确一次消费实现

    从kafka的消费机制,我们可以得到是否能够精确的消费关键在消费进度信息的准确性,如果能够保证消费进度的准确性,也就保证了消费数据的准确性

    • 数据有状态:可以根据数据信息进行确认数据是否重复消费,这时候可以使用手动提交的最少一次消费语义实现,即使消费的数据有重复,可以通过状态进行数据去重,以达到幂等的效果
    • 存储数据容器具备幂等性:在数据存入的容器具备天然的幂等(比如ElasticSearch的put操作具备幂等性,相同的数据多次执行Put操作和一次执行Put操作的结果是一致的),这样的场景也可以使用手动提交的最少一次消费语义实现,由存储数据端来进行数据去重
    • 数据无状态,并且存储容器不具备幂等:这种场景需要自行控制offset的准确性,今天文章主要说明这种场景下的处理方式,这里数据不具备状态,存储使用关系型数据库,比如MySQL

    这里简单说明一下实现思路
    1) 利用consumer api的seek方法可以指定offset进行消费,在启动消费者时查询数据库中记录的offset信息,如果是第一次启动,那么数据库中将没有offset信息,需要进行消费的元数据插入,然后从offset=0开始消费

    2) 关系型数据库具备事务的特性,当数据入库时,同时也将offset信息更新,借用关系型数据库事务的特性保证数据入库和修改offset记录这两个操作是在同一个事务中进行

    3) 使用ConsumerRebalanceListener来完成在分配分区时和Relalance时作出相应的处理逻辑

    4) 要弄清楚的是,我们在消费的时候,关闭了自动提交,我们也没有通过consumer.commitAsync()手动提交我们的位移信息,而是在每次启动一个新的consumer的时候,触发rebalance时,读取数据库中的位移信息,从该位移中开始读取partition的信息(初始化的时候为0),在没有出现异常的情况下,我们的consumer会不断从producer读取信息,这个位移是最新的那个消息位移,而且会同时把这个位移更新到数据库中,但是,当出现了rebalance时,那么consumer就会从数据库中读取开始的位移。

    表设计

    create table kafka_info(
        topic_group_partition varchar(32) primary key, //主题+组名+分区号 这里冗余设计方便通过这个主键进行更新提升效率 
        topic_group varchar(30), //主题和组名
        partition_num tinyint,//分区号
        offsets bigint default 0 //offset信息
    );

    代码

    /**
     * @Description: 实现Kafka的精确一次消费
     * @author: HuangYn
     * @date: 2019/10/15 21:10
     */
    public class ExactlyOnceConsume {
    
        private final KafkaConsumer<String, String> consumer;
        private Map<TopicPartition, Long> tpOffsetMap;
        private List<ConsumerRecord> list;
        private JDBCHelper jdbcHelper = JDBCHelper.getInstance();
        private String groupId;
        private String topic;
    
        public ExactlyOnceConsume(Properties props, String topic, String groupId) {
            this.consumer = KafkaFactory.buildConsumer(props);
            this.list = new ArrayList<>(100);
            this.tpOffsetMap = new HashMap<>();
            this.groupId = groupId;
            this.topic = topic;
            this.consumer.subscribe(Arrays.asList(this.topic), new HandleRebalance());
        }
    
        public void receiveMsg() {
            try {
    
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(1000);
                    if (!records.isEmpty()) {
                        // 处理每个partition的记录
                        records.partitions().forEach(tp -> {
                            List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
                            // 记录加到缓存中
                            tpRecords.forEach(record -> {
                                System.out.println("partition=" + record.partition() +
                                        ", offset= " + record.offset() +
                                        ", value=" + record.value());
                                list.add(record);
                            });
                            // 将partition对应的offset加到map中, 获取partition中最后一个元素的offset,
                            // +1 就是下一次读取的位移,就是本次需要提交的位移
                            tpOffsetMap.put(tp, tpRecords.get(tpRecords.size() - 1).offset() + 1);
                        });
                    }
                    // 缓存中有数据
                    if (!list.isEmpty()) {
                        // 将数据插入数据库,并且将位移信息也插入数据库
                        // 因此,每次读取到数据,都要更新本consumer在数据库中的位移信息
                        boolean success = insertIntoDB(list, tpOffsetMap);
                        if (success) {
                            list.clear();
                            tpOffsetMap.clear();
                        }
                    }
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        }
    
        private boolean insertIntoDB(List<ConsumerRecord> list,
                                     Map<TopicPartition, Long> tpOffsetMap) {
    
            // 这里应该是在同一个事务中进行的
            // 为了方便就省略了
    
            try {
                // TODO 将数据入库,这里省略了
    
                // 将partition位移更新
                String sql = "UPDATE kafka_info SET offsets = ? WHERE topic_group_partition = ?";
                List<Object[]> params = new ArrayList<>(tpOffsetMap.size());
                tpOffsetMap.forEach((tp, offset) -> {
                    Object[] param = new Object[]{offset, topic + "_" + groupId + "_" + tp.partition()};
                    params.add(param);
                });
                jdbcHelper.batchExecute(sql, params);
                return true;
            } catch (Exception e) {
                // 回滚事务
            }
        }
    
        /**
         * rebalance触发的处理器
         */
        private class HandleRebalance implements ConsumerRebalanceListener {
    
            // rebalance之前触发
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                //发生Rebalance时,只需要将list中数据和记录offset信息清空即可
                //这里为什么要清除数据,应为在Rebalance的时候有可能还有一批缓存数据在内存中没有进行入库,
                //并且offset信息也没有更新,如果不清除,那么下一次还会重新poll一次这些数据,将会导致数据重复
                System.out.println("==== onPartitionsRevoked ===== ");
                list.clear();
                tpOffsetMap.clear();
            }
    
            // rebalance后调用,consumer抓取数据之前触发
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.println("== onPartitionsAssigned ==");
    
                List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
                // 从数据库读取当前partition的信息
                Map<TopicPartition, Long> partitionOffsetMapFromDB = getPartitionOffsetMapFromDB(partitionInfos.size());
    
                // 在分配分区时指定消费位置
                for (TopicPartition partition : partitions) {
                    // 指定consumer在每个partition上的消费开始位置
                    // 如果在数据库中有对应partition的信息则使用,否则将默认从offset=0开始消费
                    if (partitionOffsetMapFromDB.get(partition) != null) {
                        consumer.seek(partition, partitionOffsetMapFromDB.get(partition));
                    } else {
                        consumer.seek(partition, 0L);
                    }
                }
            }
        }
    
        /**
         * 从数据库读取offset信息
         *
         * @param size
         * @return
         */
        private Map<TopicPartition, Long> getPartitionOffsetMapFromDB(int size) {
            Map<TopicPartition, Long> partitionOffsetMapFromDB = new HashMap<>();
            String sql = "SELECT partition_num, offsets FROM kafka_info WHERE topic_group = ?";
            jdbcHelper.executeQuery(sql, new Object[]{topic + "_" + groupId}, resultSet -> {
    
                int partition_num = -1;
                long offsets = -1;
                while (resultSet.next()) {
                    partition_num = resultSet.getInt("partition_num");
                    offsets = resultSet.getLong("offsets");
                    System.out.println("partition_num=" + partition_num + ", offset=" + offsets);
                    partitionOffsetMapFromDB.put(new TopicPartition(topic, partition_num), offsets);
                }
    
                System.out.println("partitionOffsetMapFromDB.size = " + partitionOffsetMapFromDB.size());
    
                //判断数据库是否存在所有的分区的信息,如果没有,则需要进行初始化
                if (partitionOffsetMapFromDB.size() < size) {
                    String insert = "INSERT INTO kafka_info (topic_group_partition,topic_group,partition_num) VALUES(?,?,?)";
                    List<Object[]> params = new ArrayList<>();
                    for (int p_num = 0; p_num < size; p_num++) {
                        Object[] param = new Object[]{
                                topic + "_" + groupId + "_" + p_num,
                                topic + "_" + groupId,
                                p_num
                        };
                        params.add(param);
                    }
                    jdbcHelper.batchExecute(insert, params);
                }
    
            });
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return partitionOffsetMapFromDB;
        }
    
    }

    另外一位大神的实现:

    package com.huawei.kafka.consumer;
    
    import com.alibaba.fastjson.JSON;
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.TopicPartition;
    
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.time.Duration;
    import java.util.*;
    
    /**
     * @author: xuqiangnj@163.com
     * @date: 2019/5/3 14:36
     * @description:精确一次消费实现
     */
    public class AccurateConsumer {
    
        private static final Properties props = new Properties();
    
        private static final String GROUP_ID = "Test";
    
        static {
            props.put("bootstrap.servers", "192.168.142.139:9092");
            props.put("group.id", GROUP_ID);
            props.put("enable.auto.commit", false);//注意这里设置为手动提交方式
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        }
    
        final KafkaConsumer<String, String> consumer;
    
        //用于记录每次消费时每个partition的最新offset
        private Map<TopicPartition, Long> partitionOffsetMap;
    
        //用于缓存接受消息,然后进行批量入库
        private List<Message> list;
    
        private volatile boolean isRunning = true;
    
        private final String topicName;
    
        private final String topicNameAndGroupId;
    
        public AccurateConsumer(String topicName) {
            this.topicName = topicName;
            topicNameAndGroupId = topicName + "_" + GROUP_ID;
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topicName), new HandleRebalance());
            list = new ArrayList<>(100);
            partitionOffsetMap = new HashMap<>();
        }
    
        //这里使用异步提交和同步提交的组合方式
        public void receiveMsg() {
            try {
                while (isRunning) {
                    ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                    if (!consumerRecords.isEmpty()) {
                        for (TopicPartition topicPartition : consumerRecords.partitions()) {
                            List<ConsumerRecord<String, String>> records = consumerRecords.records(topicPartition);
                            for (ConsumerRecord<String, String> record : records) {
                                //使用fastjson将记录中的值转换为Message对象,并添加到list中
                                list.addAll(JSON.parseArray(record.value(), Message.class));
                            }
                            //将partition对应的offset信息添加到map中,入库时将offset-partition信息一起进行入库
                            partitionOffsetMap.put(topicPartition, records.get(records.size() - 1)
                                    .offset() + 1);//记住这里一定要加1,因为下次消费的位置就是从+1的位置开始
                        }
                    }
                    //如果list中存在有数据,则进行入库操作
                    if (list.size() > 0) {
                        boolean isSuccess = insertIntoDB(list, partitionOffsetMap);
                        if (isSuccess) {
                            //将缓存数据清空,并将offset信息清空
                            list.clear();
                            partitionOffsetMap.clear();
                        }
                    }
                }
            } catch (Exception e) {
                //处理异常
            } finally {
                //offset信息由我们自己保存,提交offset其实没有什么必要
                //consumer.commitSync();
                close();
            }
    
        }
    
        private boolean insertIntoDB(List<Message> list, Map<TopicPartition, Long> partitionOffsetMap) {
            Connection connection = getConnection();//获取数据库连接 自行实现
            boolean flag = false;
            try {
                //设置手动提交,让插入数据和更新offset信息在一个事务中完成
                connection.setAutoCommit(false);
                insertMessage(list);//将数据进行入库 自行实现
                updateOffset(partitionOffsetMap);//更新offset信息 自行实现
                connection.commit();
                flag = true;
            } catch (SQLException e) {
                try {
                    //出现异常则回滚事务
                    connection.rollback();
                } catch (SQLException e1) {
                    //处理异常
                }
            }
            return flag;
        }
    
        //获取数据库连接 自行实现
        private Connection getConnection() {
            return null;
        }
    
        public void close() {
            isRunning = false;
            if (consumer != null) {
                consumer.close();
            }
        }
    
        private class HandleRebalance implements ConsumerRebalanceListener {
    
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                //发生Rebalance时,只需要将list中数据和记录offset信息清空即可
                //这里为什么要清除数据,应为在Rebalance的时候有可能还有一批缓存数据在内存中没有进行入库,
                //并且offset信息也没有更新,如果不清除,那么下一次还会重新poll一次这些数据,将会导致数据重复
                list.clear();
                partitionOffsetMap.clear();
            }
    
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                //获取对应Topic的分区数
                List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName);
                Map<TopicPartition, Long> partitionOffsetMapFromDB = getPartitionOffsetMapFromDB
                        (partitionInfos.size());
    
                //在分配分区时指定消费位置
                for (TopicPartition partition : partitions) {
                    //如果在数据库中有对应partition的信息则使用,否则将默认从offset=0开始消费
                    if (partitionOffsetMapFromDB.get(partition) != null) {
                        consumer.seek(partition, partitionOffsetMapFromDB.get(partition));
                    } else {
                        consumer.seek(partition, 0L);
                    }
                }
            }
        }
        /**
         * 从数据库中查询分区和offset信息
         * @param size 分区数量
         * @return 分区号和offset信息
         */
        private Map<TopicPartition, Long> getPartitionOffsetMapFromDB(int size) {
            Map<TopicPartition, Long> partitionOffsetMapFromDB = new HashMap<>();
            //从数据库中查询出对应信息
            Connection connection = getConnection();//获取数据库连接 自行实现
            PreparedStatement preparedStatement = null;
            ResultSet resultSet = null;
            String querySql = "SELECT partition_num,offsets from kafka_info WHERE topic_group = ?";
            try {
                preparedStatement = connection.prepareStatement(querySql);
                preparedStatement.setString(1, topicNameAndGroupId);
                resultSet = preparedStatement.executeQuery();
                while (resultSet.next()) {
                    partitionOffsetMapFromDB.put(new TopicPartition(topicName, resultSet.getInt(1)),
                            resultSet.getLong(2));
                }
                //判断数据库是否存在所有的分区的信息,如果没有,则需要进行初始化
                if (partitionOffsetMapFromDB.size() < size) {
                    connection.setAutoCommit(false);
                    StringBuilder sqlBuilder = new StringBuilder();
                    //partition分区号是从0开始,如果有10个分区,那么分区号就是0-9
                    /*这里拼接插入数据 格式 INSERT INTO kafka_info(topic_group_partition,topic_group,partition_num) VALUES
                    (topicNameAndGroupId_0,topicNameAndGroupId,0),(topicNameAndGroupId_1, topicNameAndGroupId,1)....*/
                    for (int i = 0; i < size; i++) {
                        sqlBuilder.append("(").append
                                (topicNameAndGroupId).append("_").append(i).append(",").append
                                (topicNameAndGroupId).append(",").append(i).append("),");
                    }
                    //将最后一个逗号去掉加上分号结束
                    sqlBuilder.deleteCharAt(sqlBuilder.length() - 1).append(";");
                    preparedStatement = connection.prepareStatement("INSERT INTO kafa_info" +
                            "(topic_group_partition,topic_group,partition_num) VALUES " + sqlBuilder.toString());
                    preparedStatement.execute();
                    connection.commit();
                }
            } catch (SQLException e) {
                //处理异常 回滚事务 这里应该结束程序 排查错误
                try {
                    connection.rollback();
                } catch (SQLException e1) {
                    //打印日志 排查错误信息
                }
    
            } finally {
                try {
                    if (resultSet != null) {
                        resultSet.close();
                    }
                    if (preparedStatement != null) {
                        preparedStatement.close();
                    }
                    if (connection != null) {
                        connection.close();
                    }
                } catch (SQLException e) {
                    //处理异常 打印日志即可 关闭资源失败
                }
            }
            return partitionOffsetMapFromDB;
        }
    }

     数据库中记录

  • 相关阅读:
    单元测试乱弹(一):悲剧的测试工具
    Android 4.2中sqlite操作问题(二):某些情况下显式启用事务能提高操作效率
    some code of debug mode
    asp .NET弹出窗口 汇总
    if (!IsPostBack)
    Asp.Net中清空所有textbox的几种方法
    ASP.NET验证控件详解
    正则表达式限制文本框只能输入数字
    颜色
    gridview 根据条件更改链接的可用和颜色
  • 原文地址:https://www.cnblogs.com/wangbin2188/p/15291159.html
Copyright © 2011-2022 走看看