zoukankan      html  css  js  c++  java
  • 2.kafka集群搭建,topic+partition消费逻辑梳理

    kafka安装机器:

    • ke01 ke02 ke03

    conf/service.properties

    #选主
    broker.id=1
    #监听端口
    listeners=PLAINTEXT://ke01:9092
    #日志地址
    log.dirs=/var/kafka_data
    #zk连接信息
    zookeeper.connect=ke02:2181,ke03:2181,ke04:2181/kafka

    环境

    • /etc/profile

    scp同步

    • broker.id每台不一样

    • listeners 修改为对应机器ip

    启动:

    • kafka-server-start.sh ./server.properties(三台都运行)

    现象

    1.zk目录中有kafka目录
    2.get /kafka/controller 可以看出当前主机是brokerid=1的
    {"version":1,"brokerid":1,"timestamp":"1627232194038"}
    3.ls /kafka/brokers/ids 查看有哪些brokers
    [1, 2, 3]
    4.ls /kafka/brokers/topics 查看有哪些topic
    []

    创建一个topic

    kafka-topics.sh  --zookeeper ke02:2181,ke03:2181/kafka --create --topic ooxx --partitions 2 --replication-factor 2

    查看所有的topic

    [root@ke03 ~]# kafka-topics.sh  --zookeeper ke02:2181,ke03:2181/kafka --list
    ooxx

    查看具体的topic详细信息

    [root@ke03 ~]# kafka-topics.sh  --zookeeper ke02:2181,ke03:2181/kafka --describe --topic ooxx
    Topic:ooxx	PartitionCount:2	ReplicationFactor:2	Configs:
    	Topic: ooxx	Partition: 0	Leader: 3	Replicas: 3,2	Isr: 3,2
    	Topic: ooxx	Partition: 1	Leader: 1	Replicas: 1,3	Isr: 1,3
    说明: 
    1.有两个分区,两个副本
    2. 0号分区在brokers=3上
    3. 1号分区在brokers=1上
    

      

    生产者消费者演练

    场景一: 启动一个消费者
    1.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group xiaoke
    2.启动生产者:kafka-console-producer.sh --broker-list ke02:9092 --topic ooxx
    3.写入数据:a1,a2,a3
    4. 一个消费者:a1,a2,a3 消费了全部数据
    结论:生产者生产数据往p-0,p-1分区发数据, 一个consumer拿到了所有分区内的数据
    
    场景二:启动二个消费者(在同一个组内)
    1.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group xiaoke
    2.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group xiaoke
    3.启动生产者:kafka-console-producer.sh --broker-list ke02:9092 --topic ooxx
    4.写入数据:b1,b2,b3,b4
    5.消费者1:b1,b3
    6.消费者2:b2,b4
    结论: 生产者生产数据往p-0,p-1分区发数据,两个consumer轮询平分了所有分区内的数据,既:1个分区对应一个consumer
    
    场景三: 启动二个消费者,每个组内一个消费者
    1.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group g1
    2.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group g2
    3.启动生产者:kafka-console-producer.sh --broker-list ke02:9092 --topic ooxx
    4.写入数据:c1,c2,c3,c4
    5.消费者1:c1,c2,c3,c4
    6.消费者2:c1,c2,c3,c4
    结论:数据的重复利用是站在Group上的

    1.kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx 
    当消费者连接没有给出组时,kafka默认分配组,每次连接都会分配一个新的
    2.查看所有组
    [root@ke03 ~]# kafka-consumer-groups.sh --bootstrap-server ke02:9092 --list
    xiaoke
    console-consumer-69705
    console-consumer-67386
    3.查看组详细
    [root@ke03 ~]# kafka-consumer-groups.sh --bootstrap-server ke03:9092 --describe --group xiaoke
    TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST             CLIENT-ID
    ooxx            0          6               6               0               consumer-1-5e729f67-d0f7-47b2-a7c4-42b4024634ee /192.168.244.183 consumer-1
    ooxx            1          5               5               0               consumer-1-5e729f67-d0f7-47b2-a7c4-42b4024634ee /192.168.244.183 consumer-1

    数据怎么保证有序性(看kafka图:http://git.mashibing.com/zhouzhilei/kafka20/-/blob/master/image/kafka%E5%89%8D%E7%9E%BB--%E6%9E%B6%E6%9E%84%E7%BB%B4%E5%BA%A6--0128.jpg)

    基础:
    1.三个生产者 A,B,C,2个分区
    2.消息是K,V的,同类的k消息是在同一个分区内
    3.多种消息K是存在同一分区内
    
    保证有序
    0.需要给消息设置key
    1.生产者要先保证有序,既A生产a,B生产b,不能A生产b,这样的话不能保证数据b有序的到达分区内 2.同一个key在同一分区内是有序的,存在不同key之间交叉。既把需要排序的数据设置同一个key 3.consumer拉取数据

    consumer拉取和推送对比

    0.kafka使用的是拉取
    1.推送:可能server消费能力跟不上kafka的推送能力,所以kafka需要维护一个状态,用来保存server发来自己能否还能消费的状态
    2.拉取:自主,按需,去订阅去拉取server的数据,拉取是批量拉取

    offset怎么维护

    1.单线程,一条一条更新offset
    2.多线程,一次处理拉过来的一批数据(5) + 事务,如果成功就+5,如果失败就回滚

    代码:

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.4.1</version>
            </dependency>
    package com.xiaoke;
    
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.junit.Test;
    
    
    import java.time.Duration;
    import java.util.*;
    import java.util.concurrent.Future;
    
    public class KafkaDemo {
    
    
        @Test
        public void produce() throws Exception {
            String topic = "604-items";
            Properties p = new Properties();
            p.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ke01:9092,ke02:9092,ke03:9092");
            //kafka  持久化数据的MQ  数据-> byte[],不会对数据进行干预,双方要约定编解码
            //kafka是一个app::使用零拷贝  sendfile 系统调用实现快速数据消费
            p.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            p.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            p.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(p);
    
            //现在的producer就是一个提供者,面向的其实是broker,虽然在使用的时候我们期望把数据打入topic
    
            /*
            topic:xjt_items
            2partition
            三种商品,每种商品有线性的3个ID
    
            验证:1.保证顺序 2.相同的商品最好去到一个分区里
             */
    
            while (true) {
                for (int i = 0; i < 3; i++) {
                    for (int j = 0; j < 3; j++) {
                        ProducerRecord<String, String> record = new ProducerRecord(topic, "item" + j, "val" + i);
                        Future<RecordMetadata> send = producer
                                .send(record);
    
                        RecordMetadata rm = send.get();
                        int partition = rm.partition();
                        long offset = rm.offset();
                        System.out.println("key: " + record.key() + " val: " + record.value() + " partition: " + partition + " offset: " + offset);
    
                    }
                }
            }
            /**
             * 1.县创建topic: kafka-topics.sh --zookeeper ke03:2181/kafka  --create --topic 604-items  --partitions 2 --replication-factor 2
             * 2.生产数据
             *
             * 结果:
             key: item0 val: val0 partition: 1 offset: 0
             key: item1 val: val0 partition: 0 offset: 0
             key: item2 val: val0 partition: 1 offset: 1
             key: item0 val: val1 partition: 1 offset: 2
             key: item1 val: val1 partition: 0 offset: 1
             key: item2 val: val1 partition: 1 offset: 3
             key: item0 val: val2 partition: 1 offset: 4
             key: item1 val: val2 partition: 0 offset: 2
             key: item2 val: val2 partition: 1 offset: 5
             key: item0 val: val0 partition: 1 offset: 6
             *
             * 整理:
             * key: item1 val: val0 partition: 0 offset: 0
             * key: item1 val: val1 partition: 0 offset: 1
             * key: item1 val: val2 partition: 0 offset: 2
             *
             *
             * key: item0 val: val0 partition: 1 offset: 0
             * key: item2 val: val0 partition: 1 offset: 1
             * key: item0 val: val1 partition: 1 offset: 2
             * key: item2 val: val1 partition: 1 offset: 3
             * key: item0 val: val2 partition: 1 offset: 4
             * key: item2 val: val2 partition: 1 offset: 5
             * key: item0 val: val0 partition: 1 offset: 6
             * 结论:
             * 1.相同的产品在同一个分区内
             * 2.相同的产品在同一个分区内是有序的
             * 3.offset是分区粒度的
             *
             */
        }
    
    
        @Test
        public void consumer() {
            String topic = "604-items";
            Properties p = new Properties();
            p.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ke01:9092,ke02:9092,ke03:9092");
            p.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            p.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            //设置消费组
            p.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group04");
    
            /**
             *         "What to do when there is no initial offset in Kafka or if the current offset
             *         does not exist any more on the server  第一次启动设置offset可以用一下参数
             *         (e.g. because that data has been deleted):
             *         <ul>
             *             <li>earliest: automatically reset the offset to the earliest offset 取当前的
             *             <li>latest: automatically reset the offset to the latest offset</li> 取之前的
             *             <li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li> 没有offset就报错
             *             <li>anything else: throw exception to the consumer.</li>
             *         </ul>";
             */
            p.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//
    
            /**
             * 0.true 自动提交offset false手动提交offset
             * 1.自动提交时异步提交,丢数据&&重复数据
             * 2.一旦你自动提交,但是是异步的
             *         //1,还没到时间,挂了,没提交,重起一个consuemr,参照offset的时候,会重复消费
             *         //2,一个批次的数据还没写数据库成功,但是这个批次的offset背异步提交了,挂了,重起一个consuemr,参照offset的时候,会丢失数据
             */
            p.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    
            p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "15000");//5秒
    //        p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,""); // POLL 拉取数据,弹性,按需,拉取多少?
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer(p);
            //kafka 的consumer会动态负载均衡
            consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    System.out.println("---onPartitionsRevoked:");
                    Iterator<TopicPartition> iter = partitions.iterator();
                    while (iter.hasNext()) {
                        System.out.println(iter.next().partition());
                    }
    
                }
    
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    System.out.println("---onPartitionsAssigned:");
                    Iterator<TopicPartition> iter = partitions.iterator();
    
                    while (iter.hasNext()) {
                        System.out.println(iter.next().partition());
                    }
    
                }
            });
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0L));// 0~n
                if (!records.isEmpty()) {
                    System.out.println("-----------" + records.count() + "-------------");
                    Set<TopicPartition> partitions = records.partitions(); //每次poll的时候是取多个分区的数据
                    for (TopicPartition partition : partitions) {
                        List<ConsumerRecord<String, String>> pRecords = records.records(partition);
                        Iterator<ConsumerRecord<String, String>> piter = pRecords.iterator();
                        while (piter.hasNext()) {
                            ConsumerRecord<String, String> next = piter.next();
                            int par = next.partition();
                            long offset = next.offset();
                            String key = next.key();
                            String value = next.value();
                            long timestamp = next.timestamp();
                            System.out.println("key: " + key + " val: " + value + " partition: " + par + " offset: " + offset + "time:: " + timestamp);
                            //单线程,多线程,都可以
    
                            /**
                             * 方式一:单条记录手动提交offset
                             */
                            TopicPartition sp = new TopicPartition("msb-items", par);
                            OffsetAndMetadata om = new OffsetAndMetadata(offset);
                            HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
                            map.put(sp, om);
                            consumer.commitSync(map);
    
                        }
                        /**
                         * 方式二:,分区粒度手动提交offset
                         */
                        long poff = pRecords.get(pRecords.size() - 1).offset();//获取分区内最后一条消息的offset
                        OffsetAndMetadata pom = new OffsetAndMetadata(poff);
                        HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
                        map.put(partition, pom);
                        consumer.commitSync(map);
    
                    }
                    /**
                     * 方式三:按poll的批次提交offset,第3点
                     */
                    consumer.commitSync();
                }
            }
    
    
        }
    
    
        /**
         * 验证earliest和latest区别:
         * 1. a.设置自动条件为15秒 b.group01 earliest启动 c.group02 latest启动
         *
         *  kafka-consumer-groups.sh --bootstrap-server ke03:9092 --describe --group group01 15秒内结果:
         *  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST            CLIENT-ID
         * 604-items       0          -               26              -               consumer-group03-1-857a5f24-fce6-4240-b73e-b9b5dc079553 /192.168.244.1  consumer-group01-1
         * 604-items       1          -               53              -               consumer-group03-1-857a5f24-fce6-4240-b73e-b9b5dc079553 /192.168.244.1  consumer-group01-1
         *
         * 15秒后结果:
         * TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST            CLIENT-ID
         * 604-items       0          26              26              0               consumer-group03-1-857a5f24-fce6-4240-b73e-b9b5dc079553 /192.168.244.1  consumer-group01-1
         * 604-items       1          53              53              0               consumer-group03-1-857a5f24-fce6-4240-b73e-b9b5dc079553 /192.168.244.1  consumer-group01-1
         *
         *
         *kafka-consumer-groups.sh --bootstrap-server ke03:9092 --describe --group group02 15秒内结果: 并没有消费之前的数据
         * TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST            CLIENT-ID
         * 604-items       0          26              26              0               consumer-group04-1-08c34055-ba99-4a66-a6e2-a7a155ab2e63 /192.168.244.1  consumer-group02-1
         * 604-items       1          53              53              0               consumer-group04-1-08c34055-ba99-4a66-a6e2-a7a155ab2e63 /192.168.244.1  consumer-group02-1
         *
         * 结论:earliest第一次启动会从0开始消费数据  latest第一次启动会从当前consumer启动后消费数据,不会消费启动之前的数据
         */
    
    
    }
  • 相关阅读:
    js对象数组(JSON) 根据某个共同字段 分组
    一个 函数 用来转化esSearch 的range 条件
    关于 vuex 报错 Do not mutate vuex store state outside mutation handlers.
    android listview 重用view导致的选择混乱问题
    android SDK和ADT的更新
    Android中adb push和adb install的使用区别
    pycharm中添加扩展工具pylint
    su Authentication failure解决
    Putty以及adb网络调试
    有关android源码编译的几个问题
  • 原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/15059209.html
Copyright © 2011-2022 走看看