zoukankan      html  css  js  c++  java
  • Kafka 生产者 自定义分区策略

    实现一个用于审计功能的分区策略:假设我们有两类消息,其中一类消息的key为audit,用于审计,放在最后一个分区中,其他消息在剩下的分区中随机分配。

    先创建一个三个分区三个副本的主题audit-test:

    bin/kafka-topics.sh --create --zookeeper localhost:2181,localhost:2812,localhost:2183 --topic audit-test 
    --partitions 3 --replication-factor 3 //查看 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic audit-test

    然后实现Kafka客户端提供的Partitioner接口:

    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.PartitionInfo;
    
    import java.util.List;
    import java.util.Map;
    import java.util.Random;
    
    
    /***
     *
     * 实现一个自定义分区策略:
     *
     * key含有audit的一部分消息发送到最后一个分区上,其他消息在其他分区随机分配
     *
     *
     *
     */
    public class PartitionerImpl implements Partitioner {
    
    
        private Random random;
    
        public void configure(Map<String, ?> configs) {
            //做必要的初始化工作
            random = new Random();
        }
    
        //分区策略
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    
            String keyObj = (String) key;
            List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
            int partitionCount = partitionInfoList.size();
            System.out.println("partition size: "  + partitionCount);
            int auditPartition = partitionCount - 1;
            return keyObj == null || "".equals(keyObj) || !keyObj.contains("audit") ? random.nextInt(partitionCount - 1) : auditPartition;
        }
    
        public void close() {
            //清理工作
        }
    }

    接下来设定启动类参数:

    //实现类
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"cn.org.fubin.PartitionerImpl");
    
    String topic = "audit-test";
    Producer<String,String> producer = new KafkaProducer<String, String>(properties);
    ProducerRecord nonKeyRecord = new ProducerRecord(topic,"non-key record");
    //这类消息需要放在最后一个分区
    ProducerRecord auditRecord = new ProducerRecord(topic,"audit","audit record");
    ProducerRecord nonAuditRecord = new ProducerRecord(topic,"other","non-audit record");
    
    try {
        producer.send(nonAuditRecord).get();
        producer.send(nonAuditRecord).get();
        producer.send(auditRecord).get();
        producer.send(nonAuditRecord).get();
        producer.send(nonAuditRecord).get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

    最后验证:多推送几次消息,查看每个分区的消息数

    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic audit-test
  • 相关阅读:
    弹出框
    my.conf 配置编码为utf-8
    解决git pull 命令失效,不能从远程服务器上拉取代码问题
    git config --global core.autocrlf false
    python数据库操作常用功能使用详解(创建表/插入数据/获取数据)
    MySQL缺失mysql_config文件
    物联网操作系统系列文章之-软件平台的力量
    50% 的财富 500 强企业使用 Windows Azure
    Mobile Service更新和 Notification Hub 对Android的支持
    Windows Azure 社区新闻综述(#68 版)
  • 原文地址:https://www.cnblogs.com/fubinhnust/p/11967881.html
Copyright © 2011-2022 走看看