zoukankan      html  css  js  c++  java
  • Kafka 生产者 自定义分区策略

    实现一个用于审计功能的分区策略:假设我们有两类消息,其中一类消息的key为audit,用于审计,放在最后一个分区中,其他消息在剩下的分区中随机分配。

    先创建一个三个分区三个副本的主题audit-test:

    bin/kafka-topics.sh --create --zookeeper localhost:2181,localhost:2812,localhost:2183 --topic audit-test 
    --partitions 3 --replication-factor 3 //查看 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic audit-test

    然后实现Kafka客户端提供的Partitioner接口:

    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.PartitionInfo;
    
    import java.util.List;
    import java.util.Map;
    import java.util.Random;
    
    
    /***
     *
     * 实现一个自定义分区策略:
     *
     * key含有audit的一部分消息发送到最后一个分区上,其他消息在其他分区随机分配
     *
     *
     *
     */
    public class PartitionerImpl implements Partitioner {
    
    
        private Random random;
    
        public void configure(Map<String, ?> configs) {
            //做必要的初始化工作
            random = new Random();
        }
    
        //分区策略
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    
            String keyObj = (String) key;
            List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
            int partitionCount = partitionInfoList.size();
            System.out.println("partition size: "  + partitionCount);
            int auditPartition = partitionCount - 1;
            return keyObj == null || "".equals(keyObj) || !keyObj.contains("audit") ? random.nextInt(partitionCount - 1) : auditPartition;
        }
    
        public void close() {
            //清理工作
        }
    }

    接下来设定启动类参数:

    //实现类
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"cn.org.fubin.PartitionerImpl");
    
    String topic = "audit-test";
    Producer<String,String> producer = new KafkaProducer<String, String>(properties);
    ProducerRecord nonKeyRecord = new ProducerRecord(topic,"non-key record");
    //这类消息需要放在最后一个分区
    ProducerRecord auditRecord = new ProducerRecord(topic,"audit","audit record");
    ProducerRecord nonAuditRecord = new ProducerRecord(topic,"other","non-audit record");
    
    try {
        producer.send(nonAuditRecord).get();
        producer.send(nonAuditRecord).get();
        producer.send(auditRecord).get();
        producer.send(nonAuditRecord).get();
        producer.send(nonAuditRecord).get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

    最后验证:多推送几次消息,查看每个分区的消息数

    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic audit-test
  • 相关阅读:
    htmlspecialchar()
    LINUX权限bash: ./startup.sh: Permission denied
    str_replace()
    centos安装教程
    给准备做软件测试的新手们的一点个人心得
    TFS安装与管理
    TFS使用指南
    实现对n个数字随机排序,并循环输出100次
    SSM启动Tomcat报错ERROR [localhoststartStop1] Context initialization failed
    同济大学软件学院万院长谈择业
  • 原文地址:https://www.cnblogs.com/fubinhnust/p/11967881.html
Copyright © 2011-2022 走看看