zoukankan      html  css  js  c++  java
  • kafka——producer

    一、工作流程

    Java版本的工作流程,ProducerRecord是消息对象。

     简单描述,

    1、主线程建立ProducerRecord对象,包含topic、partition、key、value、timestamp等信息

    2、将ProducerRecord中的消息体(key-value),序列化后结合kafkaProducer缓存的元数据(topic分区信息数等),共同提交给partitioner

    3、partitione计算出目标分区并放入缓存中对应目标分区的batch(批量思想,一批一批的发送,而不是一条一条的发送)中,这里主线程任务已经完成

    4、sender线程异步发送batch信息,并接受响应,然后回调给主线程。

    二、序列化

    kafka默认提供多种序列化器。

    ByteArraySerializer:本质什么也没做,已经是字节数组了。

    ByteBufferSerializer:序列化ByteBuffer

    BytesSerializer:序列化kafka自定义的Bytes类

    DoubleSerializer:序列化Double类型

    IntegerSerializer:序列化Integer类型

    LongSerializer:序列化Long类型

    StringSerializer:序列化String类型

    三、分区策略

    partitioner职责是确认到底要向topic的那个分区发送消息。

    kafak提供默认的分区器partitioner,分区策略:

    ① 根据消息key的hash值确定目标分区。大多数情况下消息是没有key的,我们可以控制多条消息相同key,达到这些消息被分到同一个分区,完成一些业务需求。

    ② 若无key,则采用轮询的方式确定目标分区

    ③ 用户指定,producer的API提供了用户自定指定目标分区的功能

    四、Java代码实现与参数说明

    Java代码实现:配置参数key在ProducerConfig.Java中可以找到。

     1 public class ProducerTest {
     2 
     3     private static final String HOST = "192.168.1.4";
     4 
     5     public static void main(String[] args) {
     6         Properties props = new Properties();
     7         props.put("bootstrap.servers",HOST + ":9091"+","+HOST + ":9092"+","+HOST + ":9093");//指定broker,配置部分就可以了,producer会替换成实际brokers列表
     8         props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//key序列化类,无默认值,必须配置
     9         props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//value序列化类,无默认值,必须配置,可与key不同
    10         props.put("acks","-1");//控制参数的持久性策略,须传入字符串参数
    11         props.put("retries","3");//重试次数,默认为0
    12         props.put("batch.size","16384");//一个批次大小,默认16KB,producer调优重要参数
    13         props.put("linger.ms","10");//发送消息延时,默认为0,消息立即发送,不需要batch满,producer调优重要参数
    14         props.put("buffer.memory","33554432");//消息缓冲区大小默认,32MB,producer调优重要参数
    15         props.put("max.block.ms","3000");//等待元数据超时时间,(例如发送topic,客户端未缓存)
    16         props.put("max.request.size","10485760");//请求消息的最大值,默认10MB,由于消息头开销需要比实际最大值大
    17         props.put("request.timeout.ms","3000");//请求超时时间
    18         try(Producer producer = new KafkaProducer(props);) {
    19             producer.send(new ProducerRecord("topic-test","hello world"));
    20         }
    21     }
    22 }

    除了三个必须的参数bootstrap.servers、key.serializer、value.serializer外另外必须清楚acks配置的作用

    acks:参数用于控制producer生产消息的持久化级别。

    acks = -1或all :消息发送时,producer需要等待leader broker的响应,而leader broker需要将消息写入本地日志,同时还会等ISR中副本都成功写入日志后,才会响应,吞吐量最差,不能容忍消息丢失

    acks = 0 :发送消息时,producer不需要等待leader broker的响应。吞吐量最高,但完全不担心消息是否发送成功,允许消息丢失(例如服务器日志应用)

    acks = 1 :发送消息时,producer需要等待leader broker的响应,而leader broker将消息写入本地日志,就会响应,无需等待ISR中副本都成功写入日志。吞吐量适中

    其中acks = 0 , acks = 1可能引起消息丢失。

    五、producer拦截器

    拦截器interceptor是的用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,例如修改消息,添加通用字段

    接口是

    //org.apache.kafka.clients.producer.ProducerInterceptor
    public interface ProducerInterceptor<K, V> extends Configurable {
        //封装仅send方法中,运行在用户主线程,确保消息在序列化前调用此方法
        ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);
        //回调逻辑触发前,运行在producer I/O线程(sender线程)中,不能放入过重的逻辑,会拖累消息发送效率
        void onAcknowledgement(RecordMetadata var1, Exception var2);
        //关闭拦截器,主要用于资源清理
        void close();
    }

    具体使用时将自定义的拦截器放入配置props的interceptor.classes.config属性中。

    六、同步异步发送

    新版producer的写入操作默认都是异步的。异步发送可能导致消息乱序,利用send方法返回Future对象可转化为同步发送。

               //异步发送+回调
                producer.send(new ProducerRecord("topic-test", "test1"), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (e == null){
                            System.out.println("message send success");
                        }else {
                            System.out.println("message send fail");
                        }
                    }
                });
                producer.send(new ProducerRecord("topic-test", "test2"), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (e == null){
                            System.out.println("message send success");
                        }else {
                            System.out.println("message send fail");
                        }
                    }
                });
                //利用future的get()方法实现同步发送
                producer.send(new ProducerRecord("topic-test", "test1")).get();
                producer.send(new ProducerRecord("topic-test", "test2")).get();

    七、多线程处理

    producer多线程中两种使用方法,社区推荐多线程单KafkaProducer实例,效率高

    多线程单KafkaProducer实例:所有线程共享一个KafkaProducer实例,实现简单性能好,但①共享一个内存缓冲区,需要配置更大的缓存空间;② 一旦producer实例破坏,所有用户线程无法工作

    多线程多KafkaProducer实例:每个线程维护自己的专属kafkaProducer实例,配置粒度小,单个producer奔溃不会影响其他线程;但①占用更多的内存

    八、无消息丢失配置

    ① 新版producer默认异步发送可能导致消息乱序,可采用上面同步方式发送消息,但同步方式性能很差,实际场景不推荐使用

    ②producer端“无消息丢失配置”

    /************  product config  *****************/
    block.on.buffer.full=true //缓冲区满,停止接受新消息
    acks = -1 //ISR日志全部持久化
    retries = Integer.MAX_VALUE //无限重试
    max.in.flight.requests.per.connection=1 //防止同partition下消息乱序。单个broker仅响应一个请求
    //使用带回调机制的send方法.producer.send(record,callback)
    //callback逻辑中显示地立即关闭producer,使用close()
    /************  broker config  *****************/
    unclean.leader.election.enable=false//不允许ISR外副本竞选leader
    replication.factor=3//业界通用的三备份原则
    min.insync.replicas=2//写入ISR多少个副本才算成功,acks=-1才有效,实际使用,不要使用默认值
    replication.factor>min.insync.replicas//两者相等会导致,一个副本挂掉,分区将无法提供服务
    enable.auto.commit=false

    九、旧版本producer

    新旧producer主要区别:

    ①旧版:kafka.producer.Producer;新版:org.apache.kafka.clients.producer.KafkaProducer

    ②旧版默认同步发送;新版默认异步发送

    ③旧版:kafka-core.jar;新版;kafka-clients.jar

    ④新旧参数列表几乎完全不同

    ⑤旧版本直接与ZooKeeper通信来发送数据,新版本彻底摆脱ZooKeeper的依赖

    十、spring中的producer

    public class SpringProducer {
    
        private static KafkaTemplate<String,String> kafkaTemplate = null;
        private static final String HOST = "mcip";
    
        static{
            Properties pro = new Properties();
            pro.put("bootstrap.servers",HOST + ":9091"+","+HOST + ":9092"+","+HOST + ":9093");
            pro.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            pro.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
            pro.put("acks","-1");
            pro.put("retries","3");
            pro.put("batch.size","323840");
            pro.put("linger.ms","10");
            pro.put("buffer.memory","33554432");
            pro.put("max.block.ms","3000");
            ProducerFactory producerFactory = new DefaultKafkaProducerFactory(pro);
            kafkaTemplate = new KafkaTemplate<String,String>(producerFactory,true);
        }
    
        public static ListenableFuture<SendResult<String, String>> send(String topic, String message){
            return kafkaTemplate.send(topic,message);
        }
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ListenableFuture<SendResult<String, String>> future = send("tttttopic","replica test4");
            System.out.println(future.get().getProducerRecord().value());
        }
    
    }

    参考《kafka实战》

  • 相关阅读:
    Ionic Js十:加载动作
    Ionic Js九:列表操作
    Ionic Js八:头部和底部
    Ionic Js七:手势事件
    库文件
    驱动知识
    自启动总结
    学习笔记
    写脚本切换用户
    未找到arm-linux-gcc解决办法
  • 原文地址:https://www.cnblogs.com/wqff-biubiu/p/12334212.html
Copyright © 2011-2022 走看看