zoukankan      html  css  js  c++  java
  • kafka新旧配置文件详解及API

    创建topic:
        kafka-topics.sh --create --topic test --zookeeper s102:2181 --partitions 3 --replication-factor 2
    
    列出topic:
        kafka-topics.sh --list --zookeeper s102:2181 
        
        
    启动生产者:
        kafka-console-producer.sh --broker-list s102:9092 --topic test 
    启动消费者:
        kafka-console-consumer.sh --zookeeper s102:2181 --topic test        //老API
        kafka-console-consumer.sh --bootstrap-server s102:9092 --topic t2    //新API
    
    
    
    kafka数据在zk中的分布
    ====================================    
        /broker/topics/t1            partitions":{"1":[103,104],"0":[102,103]}
        /broker/topics/t1/partition/0/stat    "leader":102,"isr":[103,102]            //in-sync
        /broker/topics/t1/partition/1/stat    "leader":103,"isr":[104,103]
    
        分区    0    102
                103
    
            1    103
                104
    
    
        kafka真实数据,在linux中以 68byte头+数据
    
    
        s102:    tt-2    √    √
        s103:    tt-0    √    √
        s104:    tt-1    √    √
    
        分区:水平分散节点压力
            kafka的数据是以轮询方式发送到每个分区
            随意指定,依赖于硬件情况
    
        副本:提供备份
            最大不能超过broker数量
    
    
        leader:
            kafka产生数据时,先发送给leader
            非leader节点在之后同步leader数据    in-sync
    
    
    
    消费者和消费者组:
    ==============================
    
        /consumers        //消费者组
                    //默认创建控制台消费者,会创建一个新组
    
    
        创建消费者手动指定消费者组
            kafka-console-consumer.sh --topic t1 --zookeeper s102:2181 --group g2
    
        一个消费者组包括多个消费者,当数据产生时,消费者组中只有一个消费者能收到消息
    
    
    
    从指定位置重复消费数据:
    ===================================
        原理:当消费者消费数据后,zk会记录一个偏移量,每消费一条数据,偏移量+1
    
    
        1、修改偏移量    /consumer/g2/offsets/
            将值设为自定义值
        
        2、在相同消费者组中启动另一个消费者
    
    
    消费线程数:
    ====================================
        分区数 = 2    钱袋
        线程数 = 2    手
    
    
        一个消费者有多个消费线程,一个消费线程处理一个分区
        当分区 = 4 , 线程 = 2     消费者 = 1    此时消费者只能一个线程轮训监控两个分区        1X2X2    抢银行来回两次
                     消费者 = 2    此时两个消费者能够分别使用两个线程监控四个分区    2X2X1    抢银行两个人一次过
        
          分区 = 2 , 线程 = 2     消费者 = 1    此时消费者两个线程能够处理两个分区            一个人一次过
                     消费者 = 2    第二个消费者不监控任何分区                人多了,不带另一个玩
                     
        
    producer配置文件:
    ================================
    
        metadata.broker.list    //连接kafka服务器的地址列表
                    //s102:9092,s103:9092,s104:9092
                
        serializer.class    //指定message的消息格式 Encoder
                    //默认DefaultEncoder ==> byte[] 
                    //kafka.serializer.IntegerEncoder
                    //kafka.serializer.LongEncoder
                    //kafka.serializer.NullEncoder
                    //kafka.serializer.StringEncoder
    
        key.serializer.class    //指定key的串行化格式
    
    
        producer.type        //生产者类型
                    //包括sync和async
                    //sync ===> 数据的同步传输    
                            类似于hbase中的put(put)
                            多用于严格有序,速度稍慢
                    //async ==> 数据异步(批次)传输    
                            类似于hbase中的put(List<put>)
                            不严格有序,速度稍快
    
        request.required.acks    //控制生产者接收确认回执
                    //0    代表不接收确认回执,最低延迟,持久性最差
                    //1    只接收来自leader的确认回执,延迟稍低,持久性略好
                    //-1    接收所有副本确认回执,延迟最差,持久性最好
                            
        partitioner.class    //kafka分区类,可以自定义
                    
        compression.codec    //压缩编解码器
                    //none, gzip, and snappy.
                
    
        batch.num.messages    //async模式设置批次大小
                    //指定message个数
                    //默认200
    
    测试同步和异步生产:
        
        sync:    10000    //52,479ms
        async:    10000    //4,871ms
    
    
    测试确认回执:
        async:    1    //3,319ms
        async: 0    //3,039ms
        async:  -1    //4,744ms
    
    
    Consumer配置文件:
    =========================================
            
        group.id        //对消费者组的唯一标识 id
            
        consumer.id        //对消费者的唯一标识
    
        zookeeper.connect    //指定zk连接串
                    //s102:2181,s103:2181,s104:2181
    
        client.id        //客户端的唯一标识,类似于group.id
    
        zookeeper.session.timeout.ms    //zk会话超时
    
        zookeeper.connection.timeout.ms //zk连接超时
    
        zookeeper.sync.time.ms        //zk的follower和leader的同步时间
    
        auto.commit.enable        //启用自动提交consumer偏移量
                        //消费数据,偏移量会自动修改
                        //默认为true
                        //注意:偏移量在消费者组下
    
        auto.commit.interval.ms        //设置自动提交间隔
    
        auto.offset.reset        //重置偏移量
                        //自定义消费位置
                        //largest: 重置消费位置为最大偏移量
                        //smallest: 重置消费位置为最小偏移量
                        //anything else: 抛出异常
    
        consumer.timeout.ms        //消费者超时设置
    
    
    偏移量:
        smallest:数据的最小偏移量    不一定是0    和zk无关,数据的起始
        largest: zk中数据偏移量
    
        
        日志保留时间:server.properties
    
        # 日志的删除时间,经过7天后自动删除
        log.retention.hours=168    
        # 超过此字节,数据会删除
        # log.retention.bytes=1073741824
        # 超过此字节,创建一个新数据段
        log.segment.bytes=1073741824
    
    
    kafka中的分区:partition
    =================================
        1、编写自定义分区:
            public class MyPartition implements Partitioner {
                public MyPartition(VerifiableProperties props) {
                }
    
                public int partition(Object key, int numPartitions) {
    
                Integer k = (Integer)key;
    
                return k % numPartitions;
                }
            }
        
        2、在生产者中指定分区
            public class MyProducer {
                public static void main(String[] args) throws Exception {
                long start = System.currentTimeMillis();
                Properties props = new Properties();
                props.put("metadata.broker.list", "s102:9092, s103:9092, s104:9092");
                props.put("serializer.class", "kafka.serializer.StringEncoder");
                //**********指定key序列化类型**********
                props.put("key.serializer.class", "kafka.serializer.IntegerEncoder");
                //**********指定分区类**********
                props.put("partitioner.class","kafka.MyPartition//完整类名");
                props.put("producer.type","async");
                props.put("request.required.acks", "0");
                ProducerConfig config = new ProducerConfig(props);
                Producer<Integer, String> producer = new Producer<Integer, String>(config);
                for (int i = 0; i < 10000; i++) {
                    String msg = "tom" + i;
                    //**********将key写入**********
                    KeyedMessage<Integer, String> data = new KeyedMessage<Integer,String>("t3",i, msg);
                    producer.send(data);
                    Thread.sleep(2000);
                }
                producer.close();
                System.out.println(System.currentTimeMillis()- start);
                }
            }
        3、消费者观察数据与所在的分区对应关系
    
    
    
    
    kafka新API:Producer配置文件
    ============================================
        kafka中发送方法:send()    是默认异步
    
        key.serializer    
        value.serializer
        acks            //0    没有回执
                    //1    leader回执 
                    //all    所有回执 
        retries            //发送数据失败重新发送的次数
        bootstrap.servers    //相当于老API中的metadata.broker.list
                    //broker列表,kafka地址
        buffer.memory        //数据缓冲区内存大小,到达则会被发送,字节数long
        compression.type    //
        batch.size        //数据发送到同一分区,尝试将指定大小的数据量变为一个批次
                    //小批次减少吞吐量    单位int
        client.id        //客户端id    string
        linger.ms        //记录在缓冲区中停留时间 long
        partitioner.class    //分区类
    
    
    将kafka中异步发送改成同步
    ============================================
            Future future = producer.send(record, callback);
    
            Callback对象:send方法中的回调机制。在数据发送后的信息汇报
                      信息包括元数据(metadata),异常(exception)
                  使用场景:当数据发送失败,能够从中获取元数据和异常状态
                          避免数据丢失,对数据再次处理
    
                    new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        
                    }
                    } 
    
            Future对象:send方法的返回值,通过future.get()获取到元数据(metadata)
                    通过调用future.get(),可以将异步变为同步
                原理:下一条记录必须要等前一个记录返回值调用之后才能够进行发送
    
        
            性能比较:
                async:    1000条数据:1239ms
            sync:    1000条数据:3360ms
    
    
    kafka数据在分区中有序,跨分区无序,相当于Hadoop中的部分排序
    
    
    producer新API的创建
    ============================================
        public class NewProducer {
            public static void main(String[] args) throws Exception {
            Properties props = new Properties();
            props.put("bootstrap.servers", "s102:9092");
            props.put("acks", "0");
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            long start = System.currentTimeMillis();
            //初始化kafka生产者对象
            Producer<Integer, String> producer = new KafkaProducer<Integer, String>(props);
            for (int i = 0; i < 1000; i++) {
                //初始化kafka记录,包括topic,key,value
                ProducerRecord record = new ProducerRecord("t3",i,"tom"+ i);
                Future future = producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    System.out.println(metadata.toString());
                    exception.printStackTrace();
                }
                });
                future.get();
            }
            producer.close();
            System.out.println(System.currentTimeMillis() - start);
            }
        }
    
    Consumer新API的创建
    ============================================
        public class NewConsumer {
            public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "s102:9092");
            props.put("group.id", "g3");
            props.put("enable.auto.commit", "true");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(props);
    
            //新ConsumerAPI可以指定多主题订阅
            List<String> topics = new ArrayList<String>();
            topics.add("t3");
            topics.add("t2");
            consumer.subscribe(topics);
    
            while (true) {
                //通过poll方式获取数据,返回的是一个批次的数据
                ConsumerRecords<Integer, String> records = consumer.poll(1000);
                for (ConsumerRecord<Integer, String> record : records)
                System.out.println(
                    "par: " + record.partition() +
                    ",offset: "+ record.offset() +
                    ", key: " + record.key() +
                    ", value: " +record.value());
            }
            }
        }
    
    Consumer新API的新特性:
    ============================================
        kafka-console-consumer.sh --bootstrap-server s102:9092 --topic t2    //启动控制台消费者
    
    
        1、指定分区消费,不能和 consumer.subscribe(topics)一起用
            List<TopicPartition> tps = new ArrayList<TopicPartition>();
            TopicPartition tp = new TopicPartition("t3",0);
            tps.add(tp);
            consumer.assign(tps);
    
    
    
        2、指定偏移量消费:off:3534    key:30    val:tom30
            前提:需要先指定分区分区进行消费
            //新ConsumerAPI可以指定分区进行消费
            List<TopicPartition> tps = new ArrayList<TopicPartition>();
            TopicPartition tp = new TopicPartition("t3",0);
            tps.add(tp);
            consumer.assign(tps);
    
            1、    //新ConsumerAPI可以指定偏移量消费
                //程度在于控制偏移量,类似于修改zk偏移量数据
                //consumer.commitSync(offsets)之后,所有消费者都从此处开始消费
                Map<TopicPartition, OffsetAndMetadata > offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
                OffsetAndMetadata om = new OffsetAndMetadata(3534);
                offsets.put(tp,om);
                consumer.commitSync(offsets);
        
            
            2、    //将poll的指针移动到指定的偏移量,对consumer影响较小
                consumer.seek(tp,3500)
    
    kafka新API:Consumer配置文件
    ============================================
        key.deserializer    //
        value.deserializer    //
        bootstrap.servers    //
    
        fetch.min.bytes        //consumer抓取请求数    int
    
        group.id        //自定义消费者组id
    
        heartbeat.interval.ms    //适用于consumer组,确保会话有效    int
    
        max.partition.fetch.bytes    //consumer抓取单个分区请求数    int
    
        session.timeout.ms    //超时视为会话失效
    
        auto.offset.reset    //重置偏移量
                    //自定义消费位置
                    //latest: 重置消费位置为最大偏移量
                    //earliest: 重置消费位置为最小偏移量
                    //none: 
                    //other:抛出异常
    
        enable.auto.commit    //自动提交偏移量    boolean
    
        exclude.internal.topics    //决定内部topic是否暴露数据给消费者
    
        fetch.max.bytes        //一个批次获取的最大数据字节数 int
    
        isolation.level        //read_committed:只能读到提交的数据
                    //read_uncommitted:能读到未提交的数据
                    //producer的事务性
                    props.put("transactional.id", "my-transactional-id");
                    producer.initTransactions();
                    producer.beginTransaction();
                    producer.commitTransaction();
                    producer.abortTransaction();
    
    
        max.poll.interval.ms    
        max.poll.records    //设置一次poll中最大记录数
  • 相关阅读:
    简单状态机
    c语言状态机
    存储公司
    正确跑步
    好好做自己能做的
    I2C学习
    es6 generator函数
    es6 for of 循环
    es6 proxy代理
    es6 Symbol类型
  • 原文地址:https://www.cnblogs.com/zyde/p/8946941.html
Copyright © 2011-2022 走看看