zoukankan      html  css  js  c++  java
  • Kafka生产者-客户端开发相关

    正常的生产逻辑需要以下几步:

    1. 配置生产者相关参数
    2. 创建一个生产者对象
    3. 构建发送消息
    4. 发送消息
    5. 关闭生产者实例

    示例代码:

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    /**
     * @author: masheng
     * @description: kafka生产者客户端示例
     * @date: 2020/07/27 17:32
     */
    public class KafkaProducerAnalyze {
    
        private static final String TOPIC = "topic_test";
        private static final String BROKER_LIST = "localhost:9092";
    
        /*
         * 功能描述: 初始化配置
         * @author: masheng
         * @time: 2020/7/27
         * @param
         * @return: java.util.Properties
         */
        public static Properties initConfig() {
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
            //所有副本都复制完返回成功,延迟最高,可以设置all、0、1三种
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            return properties;
        }
    
        public static void main(String[] args) {
            //1.配置相关参数
            Properties properties = initConfig();
            //2.初始化生产者对象
            KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
            //3.构建发送消息
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "Hello,Kafka!");
            try {
                //4.发送消息
                producer.send(record);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //5.关闭生产者实例
                producer.close();
            }
        }
    }
    

    1.ProducerRecord

    消息对象,包含多个属性,定义如下:

    public class ProducerRecord<K, V> {
        private final String topic; //主题
        private final Integer partition; //分区
        private final Headers headers; //消息头部,设置与应用相关的一些信息
        private final K key; //键
        private final V value; //值,消息体
        //消息时间戳,包含CreateTime和LogAppendTime两种,分别表示消息创建时间和追加到日志文件的时间
        private final Long timestamp; 
    }
    

    2.参数配置

    3个必填参数:

    • bootstrap.servers
    • key.serializer
    • value.serializer

    说明:broker端接收的消息要以字节数组(byte[])的形式存在,所以消息发往broker之前需要做相应的序列化操作,这里必须填写序列化器的全类名

    提示:可以使用org.apache.kafka.clients.producer.ProducerConfig类来设置参数,防止人为错误

    3.发送消息

    send方法:

        public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
            return send(record, null);
        }
    
    		public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
            // intercept the record, which can be potentially modified; this method does not throw exceptions
            ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
            return doSend(interceptedRecord, callback);
        }
    
    

    send方法有两个重载方法,可以直接使用producer.send(record).get()实现同步发送

    或者也可以获取一个RecordMetadata,从该对象里面获取一些元数据信息,如果需要这些信息,可以使用这种方式

    异步发送:可以在send()方法里指定一个回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认,示例如下:

    						//回调函数对异常进行处理
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if(null==exception){
                            System.out.println("no exception!");
                        }
                        if(null!=metadata){
                            System.out.print("offset:"+metadata.offset()+";partition:"+metadata.partition());
                        }
                    }
                })
    

    4.异常

    KafkaProducer中一般有两种异常:可重试异常和不可重试异常

    可重试异常可以通过配置retries参数来控制,如果在规定的重试次数内恢复了,就不会抛出异常,默认值为0

    5.序列化

    自带的有ByteArray、ByteBuffer、Bytes、Double、Integer、Long、String等类型的序列化器,都实现了org.apache.kafka.common.serialization.Serializer接口,有三个方法:

    //配置当前类
    void configure(Map<String, ?> configs, boolean isKey);
    //执行序列化操作
    byte[] serialize(String topic, T data);
    //关闭当前的序列化器
    void close();
    

    StringSerializer类:

    public class StringSerializer implements Serializer<String> {
        private String encoding = "UTF8";
    
      	//确定编码类型
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
            Object encodingValue = configs.get(propertyName);
            if (encodingValue == null)
                encodingValue = configs.get("serializer.encoding");
            if (encodingValue instanceof String)
                encoding = (String) encodingValue;
        }
    
        //将String类型转为byte[]类型
        @Override
        public byte[] serialize(String topic, String data) {
            try {
                if (data == null)
                    return null;
                else
                    return data.getBytes(encoding);
            } catch (UnsupportedEncodingException e) {
                throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
            }
        }
    
        //空方法
        @Override
        public void close() {
            // nothing to do
        }
    }
    

    如果自带的序列化器无法满足我们的需求,可以自己实现一个序列化器,如下:

    //创建发送消息的对象
    public class Person {
        private String name;
        private String tel;
    
        public Person() {
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getTel() {
            return tel;
        }
    
        public void setTel(String age) {
            this.tel = age;
        }
    }
    
    public class PersonSerializer implements Serializer<Person> {
    
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
    
        }
    
        @Override
        public byte[] serialize(String topic, Person data) {
            if (data == null) {
                return null;
            }
            byte[] name, tel;
            try {
                if (data.getName() != null) {
                    name = data.getName().getBytes("UTF-8");
                } else {
                    name = new byte[0];
                }
                if (data.getTel() != null) {
                    tel = data.getTel().getBytes("UTF-8");
                } else {
                    tel = new byte[0];
                }
                ByteBuffer buffer = ByteBuffer.allocate(name.length + tel.length);
                buffer.put(name);
                buffer.put(tel);
                return buffer.array();
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            return new byte[0];
        }
    
        @Override
        public void close() {
    
        }
    }
    

    6.分区器

    消息在发往broker的过程中,需要经过拦截器、序列化器、分区器等一系列作用之后,才会被发往broker

    Kafka中默认的分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner,实现了Partitioner接口,该接口定义了2个方法:

    //计算分区号
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
    //关闭分区器时回收一些资源
    public void close();
    

    DefaultPartitioner类如下:

    public class DefaultPartitioner implements Partitioner {
    
        private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
    
        public void configure(Map<String, ?> configs) {}
    
        /**
         * Compute the partition for the given record.
         *
         * @param topic The topic name
         * @param key The key to partition on (or null if no key)
         * @param keyBytes serialized key to partition on (or null if no key)
         * @param value The value to partition on or null
         * @param valueBytes serialized value to partition on or null
         * @param cluster The current cluster metadata
         */
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
          	//如果key为null,以轮询的方式发往主题内的可用分区
            if (keyBytes == null) {
                int nextValue = nextValue(topic);
                List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
                if (availablePartitions.size() > 0) {
                    int part = Utils.toPositive(nextValue) % availablePartitions.size();
                    return availablePartitions.get(part).partition();
                } else {
                    // no partitions are available, give a non-available partition
                    return Utils.toPositive(nextValue) % numPartitions;
                }
            } else {
              	// 如果key不为null,对key进行hash,根据hash值计算分区号
                // hash the keyBytes to choose a partition
                return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
            }
        }
    
        private int nextValue(String topic) {
            AtomicInteger counter = topicCounterMap.get(topic);
            if (null == counter) {
                counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
                AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
                if (currentCounter != null) {
                    counter = currentCounter;
                }
            }
            return counter.getAndIncrement();
        }
    
        public void close() {}
    
    }
    

    自定义分区器只需要实现Partitioner接口即可

    7.拦截器

    生产者拦截器可以用来在发送前做一些准备工作,比如过滤某种消息等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作

    需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口,该接口中包含3个方法:

    //将消息序列化和计算分区之前调用,对消息进行定制化操作
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
    //消息被应答之前或消息发送失败时调用,优先于用户的Callback执行,运行在Producer的I/O中,越简单越好,否则影响消息发送速度
    public void onAcknowledgement(RecordMetadata metadata, Exception exception);
    //关闭拦截器时进行一些清理工作
    public void close();
    

    示例:

    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    /**
     * @author: masheng
     * @description: 生产者拦截器示例
     * @date: 2020/07/27 20:41
     */
    public class ProducerInterceptor implements org.apache.kafka.clients.producer.ProducerInterceptor<String, String> {
    
        private volatile long sendSuccess = 0;
        private volatile long sendFailure = 0;
    
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            String modifyValue = "pre-" + record.value();
            return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(),
                    record.key(), modifyValue, record.headers());
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            if (exception == null) {
                sendSuccess++;
            } else {
                sendFailure++;
            }
        }
    
        @Override
        public void close() {
            double successRatio = (double) sendSuccess / (sendFailure + sendSuccess);
            System.out.println("发送成功率= " + String.format("%f", successRatio * 100 + "%"));
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    
  • 相关阅读:
    eclipse设置字体大小
    如何利用服务器下发的Cookie实现基于此Cookie的会话保持
    Docker学习笔记_安装和使用Python
    Docker学习笔记_安装和使用mysql
    Got permission denied while trying to connect to the Docker daemon socket at unix:///var/run/docker.sock: Get http://%2Fvar%2Frun%2Fdocker.sock/v1.38/images/json: dial unix /var/run/docker.sock: conne
    Ubuntu18.04创建新的系统用户
    Docker学习笔记_安装和使用nginx
    Docker安装和使用Tomcat
    虚拟机ubuntu18.04设置静态IP
    我学习参考的网址
  • 原文地址:https://www.cnblogs.com/jordan95225/p/13387590.html
Copyright © 2011-2022 走看看