zoukankan      html  css  js  c++  java
  • Kafka Java API+自定义分区

    kafka的API

    第一步:导入kafka的开发jar包

     


    <dependencies>

    <!--  

      <dependency>

           <groupId>org.apache.kafka</groupId>

           <artifactId>kafka-clients</artifactId>

           <version>0.11.0.1</version>

       </dependency>

    -->

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->

    <dependency>

        <groupId>org.apache.kafka</groupId>

        <artifactId>kafka-clients</artifactId>

        <version>1.0.0</version>

    </dependency>

      </dependencies>


     

     

    Kafka生产者


    @Test

       public void kafkaProducer() throws Exception {

          //1、准备配置文件

           Properties props = new Properties();

           props.put("bootstrap.servers", "hadoop-001:9092,hadoop-002:9092,hadoop-003:9092");

           props.put("acks", "all");

           props.put("retries", 0);

           props.put("batch.size", 16384);

           props.put("linger.ms", 1);

           props.put("buffer.memory", 33554432);

           props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

           props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

           //2、创建KafkaProducer

           KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);

           for (int i=0;i<100;i++){

               //3、发送数据

               kafkaProducer.send(new ProducerRecord<String, String>("yun01","num"+i,"value"+i));

           }

     

          kafkaProducer.close();

       }


     

     

    Kafka消费者


    @Test

       public void kafkaConsum() throws Exception {

            // 1、准备配置文件

           Properties props = new Properties();

           props.put("bootstrap.servers", "hadoop-001:9092,hadoop-002:9092,hadoop-003:9092");

           props.put("group.id", "test");

           props.put("enable.auto.commit", "true");

           props.put("auto.commit.interval.ms", "1000");

           props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

           props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

     

           // 2、创建KafkaConsumer

           KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);

           // 3、订阅数据,这里的topic可以是多个

           kafkaConsumer.subscribe(Arrays.asList("yun01"));

           // 4、获取数据

           while (true) {

               ConsumerRecords<String, String> records = kafkaConsumer.poll(100);

               for (ConsumerRecord<String, String> record : records) {

                   System.out.printf("topic = %s,offset = %d, key = %s, value = %s%n",record.topic(), record.offset(), record.key(), record.value());

               }

     

           }

       }

     


     

     

     

    kafka的自定义分区

    第一种方式:直接指定分区


    kafkaProducer.send(new ProducerRecord<String, String>("testpart",1,"0","value"+i));

     

    第二种自定义分区

    public class KafkaCustomPartitioner implements Partitioner {

       @Override

       public void configure(Map<String, ?> configs) {

       }

     

       @Override

       public int partition(String topic, Object arg1, byte[] keyBytes, Object arg3, byte[] arg4, Cluster cluster) {

          List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

           int partitionNum = partitions.size();

          Random random = new Random();

          int partition = random.nextInt(partitionNum);

           return partition;

       }

     

       @Override

       public void close() {

         

       }

     

    }


     

     

     

    主代码中添加配置

     


    @Test

       public void kafkaProducer() throws Exception {

          //1、准备配置文件

           Properties props = new Properties();

           props.put("bootstrap.servers", "hadoop-001:9092,hadoop-002:9092,hadoop-003:9092");

           props.put("acks", "all");

           props.put("retries", 0);

           props.put("batch.size", 16384);

           props.put("linger.ms", 1);

           props.put("buffer.memory", 33554432);

           props.put("partitioner.class", "com.gec.kafkaclient.MyCustomerPartitons");

           props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

           props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

           //2、创建KafkaProducer

           KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);

           for (int i=0;i<100;i++){

               //3、发送数据

               kafkaProducer.send(new ProducerRecord<String, String>("testpart","0","value"+i));

           }

     

          kafkaProducer.close();

       }


     

     

     

     

  • 相关阅读:
    创建基于MailKit和MimeKit的.NET基础邮件服务
    MailKit---获取邮件
    C# 与JAVA 的RSA 加密解密交互,互通,C#使用BouncyCastle来实现私钥加密,公钥解密的方法
    .net 开源 FTP 组件 edtFTPnet
    Consul1-window安装consul
    通信传输利器Netty(Net is DotNetty)介绍
    工作中,如何衡量一个人的 JavaScript 编码水平?
    10个有趣又能编译为JavaScript的语言,你用过哪些?
    一定要你明白Java中的volatile
    面试总被问到HTTP缓存机制及原理?看完你就彻底明白了
  • 原文地址:https://www.cnblogs.com/Transkai/p/10877536.html
Copyright © 2011-2022 走看看