zoukankan      html  css  js  c++  java
  • Kafka 生产者API

    一 生产者基本API

    创建主题

    bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --topic first --replication-factor 2 --partitions 2

    生产者代码

     1 public class MyProducer {
     2 
     3     public static void main(String[] args) {
     4 
     5         //所有配置的键在ProducerConfig中都有
     6         Properties props = new Properties();
     7 
     8         //kafka 集群,broker-list
     9         props.put("bootstrap.servers", "hadoop102:9092");
    10         //应答级别。all等同于-1
    11         props.put("acks", "all");
    12         //重试次数
    13         props.put("retries", 1);
    14         //批次大小和等待时间。16k发送一次,或者1毫秒发送一次(写到缓冲区)
    15         props.put("batch.size", 16384);
    16         props.put("linger.ms", 1);
    17         //RecordAccumulator 缓冲区大小
    18         props.put("buffer.memory", 33554432);
    19         //KV序列化的类
    20         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    21         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    22 
    23         //生产者
    24         KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
    25 
    26         //发送数据
    27         for (int i = 0; i < 5; i++) {
    28             ProducerRecord<String,String> producerRecord = new ProducerRecord<>("first","atguigu" + i);
    29             kafkaProducer.send(producerRecord);
    30         }
    31 
    32         //关闭资源。上面发送的5条消息,既没有16k,也不到1毫秒,可能不会发送。关闭才发送送。
    33         kafkaProducer.close();
    34 
    35     }
    36 }

    控制台消费查看

    [atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
    atguigu1
    atguigu3
    atguigu0
    atguigu2
    atguigu4

    同步发送。调用send方法返回的Future对象的get方法,阻塞线程。

     1 for (int i = 0; i < 5; i++) {
     2     ProducerRecord<String,String> producerRecord = new ProducerRecord<>("first","atguigu" + i);
     3     Future<RecordMetadata> metadataFuture = kafkaProducer.send(producerRecord);
     4 
     5     try {
     6         RecordMetadata recordMetadata = metadataFuture.get();
     7     } catch (InterruptedException e) {
     8         e.printStackTrace();
     9     } catch (ExecutionException e) {
    10         e.printStackTrace();
    11     }
    12 }

    二 带回调函数的API

    生产者代码

     1 public class CallbackProducer {
     2 
     3     public static void main(String[] args) {
     4 
     5         Properties props = new Properties();
     6 
     7         //只配置这三项,其他用默认配置
     8         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
     9         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    10         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    11 
    12         KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
    13 
    14         for (int i = 0; i < 10; i++) {
    15             ProducerRecord<String, String> producerRecord = new ProducerRecord<>("first", "atguigu" + i);
    16 
    17             Callback callback = (recordMetadata, e) -> {
    18                 if (e == null) {
    19                     System.out.println("发送成功!分区:" + recordMetadata.partition() + ",偏移量:" + recordMetadata.offset());
    20                 }else {
    21                     e.printStackTrace();
    22                 }
    23             };
    24 
    25             kafkaProducer.send(producerRecord, callback);
    26         }
    27 
    28         kafkaProducer.close();
    29     }
    30 }

    控制台打印如下,可以看到第一个批次发往分区0,第二个批次发往分区1,且每个分区都单独有自己的偏移量。由于示例1中发送了5条数据,占用了分区0的偏移位置0和1,分区1的0,1,2。

    发送成功!分区:0,偏移量:2
    发送成功!分区:0,偏移量:3
    发送成功!分区:0,偏移量:4
    发送成功!分区:0,偏移量:5
    发送成功!分区:0,偏移量:6
    发送成功!分区:1,偏移量:3
    发送成功!分区:1,偏移量:4
    发送成功!分区:1,偏移量:5
    发送成功!分区:1,偏移量:6
    发送成功!分区:1,偏移量:7
  • 相关阅读:
    字符串,format格式化及列表的相关进阶操作---day07
    利用wiile双层循环打印各种星星---day06
    双层循环练习,pass_break_continue,和for循环---day06
    类型判断,代码块,流程控制及循环---day05
    频繁项集算法
    Unity 物体移动的理解
    Game1---游戏设计
    精读Hadamard Response论文
    java 创建线程
    Unity游戏开发面试基础知识
  • 原文地址:https://www.cnblogs.com/noyouth/p/12863211.html
Copyright © 2011-2022 走看看