zoukankan      html  css  js  c++  java
  • kafka producer实例

    1. 定义要发送的消息User POJO

    package lenmom.kafkaproducer;
    
    public class User {
        public String name;
        public int age;
        public String address;
    
        public User(String name,int age,String address){
            this.name=name;
            this.age=age;
            this.address=address;
        }
    
        @Override
        public String toString() {
            return  "user:"+name +",age:"+age+",address:"+address;
        }
    }

    2. 定义序列化User POJO序列化器

    package lenmom.kafkaproducer;
    
    import org.apache.kafka.common.serialization.Serializer;
    import org.codehaus.jackson.map.ObjectMapper;
    
    import java.util.Map;
    
    public class UserSerializer implements Serializer {
        private ObjectMapper objectMapper;
    
        public void configure(Map map, boolean b) {
            this.objectMapper = new ObjectMapper();
        }
    
        public byte[] serialize(String topic, Object object) {
            byte[] result = null;
            try {
                result = objectMapper.writeValueAsString(object).getBytes("utf-8");
            } catch (Exception e) {
                  e.printStackTrace();
            }
            return result;
        }
    
        public void close() {
    
        }
    }

    此序列化器使用了org.codehaus.jackson.jackson-mapper-asl.jar来进行序列化为jason

    3. 定义partitioner

    package lenmom.kafkaproducer;
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.PartitionInfo;
    
    import java.util.List;
    import java.util.Map;
    import java.util.Random;
    
    public class AgePartitioner implements Partitioner {
    
        private Random random;
    
        public int partition(String topic, Object keyObj, byte[] keyBytes, Object value,
                             byte[] valueBytes, Cluster cluster) {
            System.out.println("key:"+keyObj.toString()+",value: "+value.toString());
    
            Integer age = ((User)value).age;
            List<PartitionInfo> partitions = cluster.availablePartitionsForTopic(topic);
            int partitionCount = ((List) partitions).size();
            int auditPartition = partitionCount - 1;
            // 如果年龄大于20岁的,分发到kafka的最后一个分区,否则随机发送到前面几个分区中去
            return age< 20 ? random.nextInt(partitionCount - 1) : auditPartition;
        }
    
        public void close() {
    
        }
    
        public void configure(Map<String, ?> map) {
            this.random = new Random();
        }
    }

    4. 定义发送的消息统计拦截器

    package lenmom.kafkaproducer;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    public class CounterIntecepter implements ProducerInterceptor {
        int count = 0;
        int failedCount = 0;
    
        /*
         * This method will be called before the message send to the broker
         * */
        public ProducerRecord onSend(ProducerRecord producerRecord) {
            count += 1;
            System.out.println("preparing to send value: " + producerRecord.value().toString());
            return producerRecord;
        }
    
        /*
         * This method would be called when the message was handled by the borker after the broker call back.
         */
        public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
            if (e != null) {
                failedCount++;
            }
        }
    
        public void close() {
            System.out.println("total: " + this.count + ",failed: " + this.failedCount);
        }
    
        public void configure(Map<String, ?> map) {
    
        }
    }

    5. Producer主类

    package lenmom.kafkaproducer;
    
    import org.apache.kafka.clients.producer.*;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    
    public class ProducerDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.1.131:9092,192.168.1.193:9092,192.168.1.194:9092");
            props.put("acks", "all"); // 
            props.put("retries", Integer.MAX_VALUE);
            props.put("batch.size", 323840);
            props.put("linger.ms", 100);
            props.put("buffer.memory", 33554432);
            props.put("max.block.ms", 3000);
            props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
            props.put(ProducerConfig.RETRIES_CONFIG, 3);
            props.put("max.in.flight.requests.per.connection",1);
    
            testCustomSerializerProducer(props);
        }
    
        private static void testCustomSerializerProducer(Properties props) throws ExecutionException, InterruptedException {
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "lenmom.kafkaproducer.UserSerializer");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            
    //设置消息拦截器 List
    <String> interceptors= new ArrayList<String>(); interceptors.add("lenmom.kafkaproducer.CounterIntecepter"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);

    //设置自定义分区
    props.put("partitioner.class", "lenmom.kafkaproducer.AgePartitioner");
    final Producer<String, User> producer = new KafkaProducer<String, User>(props); for (int i = 0; i < 100; i++) { User user = new User("lenmom" + i, i, "address" + i); ProducerRecord<String, User> record= new ProducerRecord<String, User>("lenmom", user); producer.send(record, new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { producer.close(0,TimeUnit.MILLISECONDS); } else { System.out.println("send success"); } } }); } producer.close(); } }

    注意: KafkaProducer是线程安全的,也就是说可以在多个线程中共享一个KafkaProducer实例.

    6. pom.xml文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>lenmom</groupId>
        <artifactId>kafkaproducer</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.10.1.0</version>
            </dependency>
            <dependency>
                <groupId>org.codehaus.jackson</groupId>
                <artifactId>jackson-mapper-asl</artifactId>
                <version>1.9.13</version>
            </dependency>
    
        </dependencies>
    
    </project>

    运行效果:

    Producer执行效果

    启动consumer:

    $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.101:9092 --topic lenmom --from-beginning

    接收到的消息:

    7. 配置参数说明

     7.1 消息无丢失配置

    7.1.1  Producer配置

    7.1.1.1

    #使得内存缓冲区被填满时producer 处于阻塞状态并停止接收新的消息而不是抛出异常;否则producer 生产速度过快会耗尽缓冲区
    block.on.buffer.full =true

    7.1.1.2
    #设置acks 为all 很容易理解,即必须要等到所有fo llower 都响应了发送消息才能认为提交成功,这是pro ducer 端最强程度的持久化保证。
    acks = all or 1

    7.1.1.3

    #重试次数设置为Integer.MAX_VALUE,设置成MAX_VALUE 纵然有些极端,但其实想表达的是producer 要开启无限重试。
    #用户不必担心pro ducer 会重试那些肯定无法恢复的错误,当前producer 只会重试那些可恢复的异常
    #情况,所以放心地设置一个比较大的值通常能很好地保证消息不丢失。
    retries = Integer.MAX_VALUE

    7.1.1.4
    #设置该参数为1 主要是为工防止topic 同分区下的消息乱序问题。这个参数的实际效果其
    #实限制了producer 在单个broker 连接上能够发送的未响应请求的数量。因此,如果设置成l ,
    #则producer 在某个broker 发送响应之前将无法再给该broker 发送PRODUCE 请求
    max.in.flight.requests.per.connection= 1

    7.1.1.5

    使用带有回调机制的send
    不要使用KafkaPro ducer 中单参数的send 方法,因为该se nd 调用仅仅是把消息发出而不会理会消息发送的结果。如果消息发送失败,该方法不会得到任何通知,故可能造成数据的丢失。
    实际环境中一定要使用带回调机制的send 版本,即KafkaProducer.send(record, callback) 。

    7.1.1.6

    Callback 逻辑中显式立即关闭producer
    在Callback 的失败处理逻辑中显式调用KafkaProducer .close(O ) 。这样做的目的是为了处理消息的乱序问题。
    若不使用close(O),默认情况下producer 会被允许将未完成的消息发送出去,这样就有可能造成消息乱序。

    7.1.2 Broker消息无丢失设置

    7.1.2.1

    #broker关闭unclean leader 选举,即不允许非ISR 中的副本被选举为leader ,从而避免broker 端因日志水位截断而造成的消息丢失。
    unclean.leader.election .enable= false

    7.1.2.2

    #broker 设置成3 主要是参考了Hadoop 及业界通用的三备份原则,其实这里想强调的是一定要使用多个副本来保存分区的消息。
    replication.factor = 3

    7.1.2.3

    #broker 用于控制某条消息至少被写入到ISR 中的多少个副本才算成功,设置成大于1 是为了提升producer 端发送语义的持久性。
    #记住只有在producer 端acks 被设置成all 或一l 时,这个参数才有意义。在实际使用时,不要使用默认值。
    min.insync .replicas = 2

    7.1.2.4

    #btoker 若两者相等,那么只要有一个副本挂掉,分区就无法正常工作,虽然有很高的持久性但可用性被极大地降低了。推荐配置成replication.factor= min.insyn.replicas + l
    replication.factor > min.insync.replicas

    7.1.2.5

    #broker 禁用自动提交
    enable.auto.commit= false

    7.2 压缩参数设置

    producer使用压缩算法,可以提高kafka的吞吐量,目前支持的压缩算法有GZIP 、Snappy、LZ4和Facebook 公司于2016 年8 月底开源了新的压缩算法Zstandard。

    配置方法为,在给producer的配置构造函数中加入对应的配置项,如:

    props . put (” compressiont.typ”,” snappy");
    //或者
    props.put(ProducerConfig . COMPRESSION TYPE CONFIG ,”snappy”);

    使用压缩算法,对集群的CPU有要求,建议的生产集群配置为:

    linux
    CPU 24 核
    内存32GB 。
    磁盘lTB 7200 转SAS 盘两块
    带宽lGb/s
    ulimit -n 1000000 。
    Socket Buffer 至少64KB一一适用于跨机房网络传输。

  • 相关阅读:
    Python类的继承(进阶5)
    面向对象编程基础(进阶4)
    Python模块(进阶3)
    Python函数式编程(进阶2)
    python多线程
    Ternary Search Tree Java实现
    Trie和Ternary Search Tree介绍
    索引时利用K-邻近算法过滤重复歌曲
    Sql排名和分组排名
    Lucene和jackson冲突
  • 原文地址:https://www.cnblogs.com/lenmom/p/10321060.html
Copyright © 2011-2022 走看看