zoukankan      html  css  js  c++  java
  • 【Kafka】Producer API

    Producer API


    Kafka官网文档给了基本格式

    地址:http://kafka.apachecn.org/10/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

    基础模板
     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("acks", "all");
     props.put("retries", 0);
     props.put("batch.size", 16384);
     props.put("linger.ms", 1);
     props.put("buffer.memory", 33554432);
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
     Producer<String, String> producer = new KafkaProducer<>(props);
     for (int i = 0; i < 100; i++)
         producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
    
     producer.close();
    

    实际操作使用
    package cn.itcast.kafka.demo1;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    public class MyProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            //指定kafka服务器地址
            props.put("bootstrap.servers", "node01:9092");
            //消息确认机制
            props.put("acks", "all");
            //重试机制
            props.put("retries", 0);
            //批量发送大小
            props.put("batch.size", 16384);
            //消息延迟
            props.put("linger.ms", 1);
            //消息缓冲区大小
            props.put("buffer.memory", 33554432);
            //定义key和value的序列化
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            //for循环往消息队列发送数据
            for (int i = 0; i < 100; i++) {
                producer.send(new ProducerRecord<String, String>("test", "这是第" + i + "条message"));
            }
    
            producer.close();
        }
    }
    

    运行代码就可以在Consumer控制台进行消费
    在这里插入图片描述
    但是我们可以发现数据是乱序的
    这在【Kafka】Kafka简单介绍 讲内部架构的Partition中就有解释:当topic只有一个Partition的时候,Kafka可以保证Consumer消费数据时是有序的,但如果是多个Partition,Kafka则无法做到Consumer有序消费数据

    想了解有关kafka数据分区策略的有关知识可以查看文章:【Kafka】数据分区策略

  • 相关阅读:
    React.render和reactDom.render的区别
    CSS中position的4种定位详解
    React.js入门必须知道的那些事
    JS处理事件小技巧
    React.js深入学习详细解析
    React.js实现原生js拖拽效果及思考
    Linux ./configure && make && make install 编译安装和卸载
    Redis set集合结构及命令详解
    Redis数据过期策略
    Redis TTL命令
  • 原文地址:https://www.cnblogs.com/zzzsw0412/p/12772445.html
Copyright © 2011-2022 走看看