zoukankan      html  css  js  c++  java
  • kafka原生producer API

    转自https://blog.csdn.net/tianlan996/article/details/80495208

    1. 类

    public class KafkaProducer<K,V>
    extends java.lang.Object
    implements Producer<K,V>

    2. producer是线程安全的(这点不同于consumer),多线程共享producer可以提高效率。

    3. 使用示例:

     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("acks", "all");
     props.put("retries", 0);---重试次数
     props.put("batch.size", 16384);
     props.put("linger.ms", 1);
     props.put("buffer.memory", 33554432);
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     
     Producer<String, String> producer = new KafkaProducer<>(props);
     for (int i = 0; i < 100; i++)
         producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
     
     producer.close();

    producer包含一个缓存空间来存放未发送的记录,一个后台i/O进程负责缓存中记录的请求和传送。使用后,如果不关闭producer,那么将存在资源泄漏。

    send()方法是异步的。

    batch.size:producer维护了每个分区未发送记录的缓存,该缓存的大小由batch.size设定。

    linger.ms:一般情况下,记录会被立即发送出去,而不会等待缓存的填充。用户可以通过配置linger.ms来让producer等待一段时间再发送消息。

    buffer.memory:缓存的大小。消息填满缓存后,后续的消息就会阻塞。阻塞超过max.block.ms设定的时间,就会抛出TimeoutException。

    key.serializer and value.serializer:如何将key和value组合成对象,可以自定义类。使用 StringSerializer默认组合成字符串。

    3. idempotent producer

    enable.idempotence true
    retries Integer.MAX_VALUE(不设置,默认即为此值)
    acks all
    通过上述配置开启idempotent,可以保证exactly语意。prouducer java api 不变。只有在同一个session中才能保证produder的idempotent。
    4. transactional producer
    transactional.id <one value>
    replication.factor 3
    min.insync.replicas 2
    通过上述配置开启transactional。transactional.id被设置后,idempotent也会自动开启。
    consumer端需要配置为 只消费committed的消息。

    在分区应用中,每个producer的transactional.id须唯一。

     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("transactional.id", "my-transactional-id");// 一旦被设置,发送代码就需要使用事务
     Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
     
     producer.initTransactions();
     
     try {
         producer.beginTransaction();
         for (int i = 0; i < 100; i++) //100条消息组成一个单独的事务
             producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
         producer.commitTransaction();
     } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
         // We can't recover from these exceptions, so our only option is to close the producer and exit.
         producer.close();
     } catch (KafkaException e) {
         // For all other exceptions, just abort the transaction and try again.
         producer.abortTransaction();
     }
     producer.close();
  • 相关阅读:
    css文字两端对齐,而且居中
    vue项目做微信分享总结
    js获取url参数
    vue微信支付遇到的坑
    Win7的环境变量下的系统变量path不小心修改了,怎么恢复
    解决ios上滑动不流畅及滚动条隐藏无效问题
    数组更新检测
    列表渲染
    条件渲染
    vue调试工具的安装
  • 原文地址:https://www.cnblogs.com/wangleBlogs/p/9712763.html
Copyright © 2011-2022 走看看