zoukankan      html  css  js  c++  java
  • Kafka Java API+自定义分区

    kafka的API

    第一步:导入kafka的开发jar包

     


    <dependencies>

    <!--  

      <dependency>

           <groupId>org.apache.kafka</groupId>

           <artifactId>kafka-clients</artifactId>

           <version>0.11.0.1</version>

       </dependency>

    -->

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->

    <dependency>

        <groupId>org.apache.kafka</groupId>

        <artifactId>kafka-clients</artifactId>

        <version>1.0.0</version>

    </dependency>

      </dependencies>


     

     

    Kafka生产者


    @Test

       public void kafkaProducer() throws Exception {

          //1、准备配置文件

           Properties props = new Properties();

           props.put("bootstrap.servers", "hadoop-001:9092,hadoop-002:9092,hadoop-003:9092");

           props.put("acks", "all");

           props.put("retries", 0);

           props.put("batch.size", 16384);

           props.put("linger.ms", 1);

           props.put("buffer.memory", 33554432);

           props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

           props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

           //2、创建KafkaProducer

           KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);

           for (int i=0;i<100;i++){

               //3、发送数据

               kafkaProducer.send(new ProducerRecord<String, String>("yun01","num"+i,"value"+i));

           }

     

          kafkaProducer.close();

       }


     

     

    Kafka消费者


    @Test

       public void kafkaConsum() throws Exception {

            // 1、准备配置文件

           Properties props = new Properties();

           props.put("bootstrap.servers", "hadoop-001:9092,hadoop-002:9092,hadoop-003:9092");

           props.put("group.id", "test");

           props.put("enable.auto.commit", "true");

           props.put("auto.commit.interval.ms", "1000");

           props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

           props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

     

           // 2、创建KafkaConsumer

           KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);

           // 3、订阅数据,这里的topic可以是多个

           kafkaConsumer.subscribe(Arrays.asList("yun01"));

           // 4、获取数据

           while (true) {

               ConsumerRecords<String, String> records = kafkaConsumer.poll(100);

               for (ConsumerRecord<String, String> record : records) {

                   System.out.printf("topic = %s,offset = %d, key = %s, value = %s%n",record.topic(), record.offset(), record.key(), record.value());

               }

     

           }

       }

     


     

     

     

    kafka的自定义分区

    第一种方式:直接指定分区


    kafkaProducer.send(new ProducerRecord<String, String>("testpart",1,"0","value"+i));

     

    第二种自定义分区

    public class KafkaCustomPartitioner implements Partitioner {

       @Override

       public void configure(Map<String, ?> configs) {

       }

     

       @Override

       public int partition(String topic, Object arg1, byte[] keyBytes, Object arg3, byte[] arg4, Cluster cluster) {

          List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

           int partitionNum = partitions.size();

          Random random = new Random();

          int partition = random.nextInt(partitionNum);

           return partition;

       }

     

       @Override

       public void close() {

         

       }

     

    }


     

     

     

    主代码中添加配置

     


    @Test

       public void kafkaProducer() throws Exception {

          //1、准备配置文件

           Properties props = new Properties();

           props.put("bootstrap.servers", "hadoop-001:9092,hadoop-002:9092,hadoop-003:9092");

           props.put("acks", "all");

           props.put("retries", 0);

           props.put("batch.size", 16384);

           props.put("linger.ms", 1);

           props.put("buffer.memory", 33554432);

           props.put("partitioner.class", "com.gec.kafkaclient.MyCustomerPartitons");

           props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

           props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

           //2、创建KafkaProducer

           KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);

           for (int i=0;i<100;i++){

               //3、发送数据

               kafkaProducer.send(new ProducerRecord<String, String>("testpart","0","value"+i));

           }

     

          kafkaProducer.close();

       }


     

     

     

     

  • 相关阅读:
    CSUFT 1002 Robot Navigation
    CSUFT 1003 All Your Base
    Uva 1599 最佳路径
    Uva 10129 单词
    欧拉回路
    Uva 10305 给任务排序
    uva 816 Abbott的复仇
    Uva 1103 古代象形文字
    Uva 10118 免费糖果
    Uva 725 除法
  • 原文地址:https://www.cnblogs.com/Transkai/p/10877536.html
Copyright © 2011-2022 走看看