zoukankan      html  css  js  c++  java
  • Kafka生产者

    开发者可以使用Kafka内置的客户端API开发应用程序

    生产者

    一个应用程序往kafka写入消息:记录用户的活动、记录度量指标、保存日志消息、记录智能家电的信息、与其他应用程序进行异步通信等

    发送消息流程

    1. 创建一个ProducerRecord对象开始,ProducerRecord对象需要包含目标topic、partition、key、value。在发送Producer对象时,生产者要先把键和值对象序列化成字节数组。
    2. 数据被传送给分区器,分区器会根据分区或键哈希来选择一个分区,将记录添加到记录批次,这个批次的消息会被发送到同一主题和分区。
    3. 服务器收到消息,会返回响应。写入成功,返回一个RecordMetaData对象,包含主题和分区信息,以及记录偏移量,应该返回到zk。写入失败,返回错误,生产者收到错误后会重新发送消息。

    创建Producer代码

    private Properties kafkaProps = new Properties(); 
    kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
    kafkaProps.put("key.serializer",
    "org.apache.kafka.common.serialization.StringSerializer"); 
    kafkaProps.put("value.serializer",
    "org.apache.kafka.common.serialization.StringSerializer");
    producer = new KafkaProducer<String, String>(kafkaProps);
    

    发送消息由3种方式

    • 发送并忘记。消息发送给服务器后,并不关心是否正常到达
    • 同步发送。使用send()发送,返回一个Future对象,调用get()进行等待,可以直到消息是否发送成功
    • 异步发送。使用send()发送,并指定一个回调函数,服务器在返回响应式调用该函数

    序列化器

    创建一个生产者对象必须指定序列化器。

    如果发送到kafka的对象不是简单的字符串或整型,可以使用序列化框架来创建消息记录,如Avro、Thrift、Protobuf,或者使用自定义序列化器。

    自定义序列化
    //简单一个客户类
    public class Customer {
        private int customerID;
        private String customerName;
        public Customer(int ID, String name) {
            this.customerID = ID;
            this.customerName = name;
        }
        public int getID() {
        return customerID;
        }
        public String getName() {
        return customerName;
        }
    }
    
    //创建序列化器
    import org.apache.kafka.common.errors.SerializationException;
    import java.nio.ByteBuffer;
    import java.util.Map;
    
    public class CustomerSerializer implements Serializer<Customer> {
    @Override
        public void configure(Map configs, boolean isKey) {
        // 不做任何配置
        }
        @Override
        /**
        Customer对象被序列化成:
        表示customerID的4字节整数
        表示customerName长度的4字节整数(如果customerName为空,则长度为0)
        表示customerName的N个字节
        */
        public byte[] serialize(String topic, Customer data) {
        try {
        byte[] serializedName;
        int stringSize;
        if (data == null)
            return null;
        else {
            if (data.getName() != null) {
                serializedName = data.getName().getBytes("UTF-8");
                stringSize = serializedName.length;
            } else {
                serializedName = new byte[0];
                stringSize = 0;
            }
        }
        ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
        buffer.putInt(data.getID());
        buffer.putInt(stringSize);
        buffer.put(serializedName);
        return buffer.array();
        } catch (Exception e) {
        throw new SerializationException("Error when serializing Customer to
    byte[] " + e);
        }
    }
        @Override
        public void close() {
        // 不需要关闭任何东西
        }
    }
    
    使用Avro序列化

    数据被序列化成二进制文件或JSON文件,Avro在读写文件时需要用到schema,schema一般会被内嵌在数据文件里。特点,当写信息使用新的schema,负责读信息可以继续使用,无需改动。

    分区

    如果键位NULL,分区器使用Round Robin将消息均衡分布到各个分区,键不为空,则对键散列,映射到相应分区,同一个键总是在同一个分区。

    自定义分区策略
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.record.InvalidRecordException;
    import org.apache.kafka.common.utils.Utils;
    public class BananaPartitioner implements Partitioner {
        public void configure(Map<String, ?> configs) {} 
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if ((keyBytes == null) || (!(key instanceOf String))) ➋
        throw new InvalidRecordException("We expect all messages to have customer name as key")
    
        if (((String) key).equals("Banana"))
        return numPartitions; // Banana总是被分配到最后一个分区
        // 其他记录被散列到其他分区
        return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
        }
        public void close() {}
    }
    
  • 相关阅读:
    通过运用CSS框架学到了什么
    浏览器判断一览表
    默认样式表之HTML4
    MooTools 1.4 源码分析 -overloadSetter
    MooTools 1.4 源码分析
    MooTools 1.4 源码分析
    javascript一些底层方法总结及用法
    viewport移动端适配,读文笔记
    vue获取v-model数据类型boolean改变成string
    contextify::ContextifyScript::New(const v8::FunctionCallbackInfo<v8::Value>&):
  • 原文地址:https://www.cnblogs.com/chenshaowei/p/12513709.html
Copyright © 2011-2022 走看看