zoukankan      html  css  js  c++  java
  • 2.kafka集群搭建,topic+partition消费逻辑梳理

    kafka安装机器:

    • ke01 ke02 ke03

    conf/service.properties

    #选主
    broker.id=1
    #监听端口
    listeners=PLAINTEXT://ke01:9092
    #日志地址
    log.dirs=/var/kafka_data
    #zk连接信息
    zookeeper.connect=ke02:2181,ke03:2181,ke04:2181/kafka

    环境

    • /etc/profile

    scp同步

    • broker.id每台不一样

    • listeners 修改为对应机器ip

    启动:

    • kafka-server-start.sh ./server.properties(三台都运行)

    现象

    1.zk目录中有kafka目录
    2.get /kafka/controller 可以看出当前主机是brokerid=1的
    {"version":1,"brokerid":1,"timestamp":"1627232194038"}
    3.ls /kafka/brokers/ids 查看有哪些brokers
    [1, 2, 3]
    4.ls /kafka/brokers/topics 查看有哪些topic
    []

    创建一个topic

    kafka-topics.sh  --zookeeper ke02:2181,ke03:2181/kafka --create --topic ooxx --partitions 2 --replication-factor 2

    查看所有的topic

    [root@ke03 ~]# kafka-topics.sh  --zookeeper ke02:2181,ke03:2181/kafka --list
    ooxx

    查看具体的topic详细信息

    [root@ke03 ~]# kafka-topics.sh  --zookeeper ke02:2181,ke03:2181/kafka --describe --topic ooxx
    Topic:ooxx	PartitionCount:2	ReplicationFactor:2	Configs:
    	Topic: ooxx	Partition: 0	Leader: 3	Replicas: 3,2	Isr: 3,2
    	Topic: ooxx	Partition: 1	Leader: 1	Replicas: 1,3	Isr: 1,3
    说明: 
    1.有两个分区,两个副本
    2. 0号分区在brokers=3上
    3. 1号分区在brokers=1上
    

      

    生产者消费者演练

    场景一: 启动一个消费者
    1.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group xiaoke
    2.启动生产者:kafka-console-producer.sh --broker-list ke02:9092 --topic ooxx
    3.写入数据:a1,a2,a3
    4. 一个消费者:a1,a2,a3 消费了全部数据
    结论:生产者生产数据往p-0,p-1分区发数据, 一个consumer拿到了所有分区内的数据
    
    场景二:启动二个消费者(在同一个组内)
    1.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group xiaoke
    2.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group xiaoke
    3.启动生产者:kafka-console-producer.sh --broker-list ke02:9092 --topic ooxx
    4.写入数据:b1,b2,b3,b4
    5.消费者1:b1,b3
    6.消费者2:b2,b4
    结论: 生产者生产数据往p-0,p-1分区发数据,两个consumer轮询平分了所有分区内的数据,既:1个分区对应一个consumer
    
    场景三: 启动二个消费者,每个组内一个消费者
    1.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group g1
    2.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group g2
    3.启动生产者:kafka-console-producer.sh --broker-list ke02:9092 --topic ooxx
    4.写入数据:c1,c2,c3,c4
    5.消费者1:c1,c2,c3,c4
    6.消费者2:c1,c2,c3,c4
    结论:数据的重复利用是站在Group上的

    1.kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx 
    当消费者连接没有给出组时,kafka默认分配组,每次连接都会分配一个新的
    2.查看所有组
    [root@ke03 ~]# kafka-consumer-groups.sh --bootstrap-server ke02:9092 --list
    xiaoke
    console-consumer-69705
    console-consumer-67386
    3.查看组详细
    [root@ke03 ~]# kafka-consumer-groups.sh --bootstrap-server ke03:9092 --describe --group xiaoke
    TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST             CLIENT-ID
    ooxx            0          6               6               0               consumer-1-5e729f67-d0f7-47b2-a7c4-42b4024634ee /192.168.244.183 consumer-1
    ooxx            1          5               5               0               consumer-1-5e729f67-d0f7-47b2-a7c4-42b4024634ee /192.168.244.183 consumer-1

    数据怎么保证有序性(看kafka图:http://git.mashibing.com/zhouzhilei/kafka20/-/blob/master/image/kafka%E5%89%8D%E7%9E%BB--%E6%9E%B6%E6%9E%84%E7%BB%B4%E5%BA%A6--0128.jpg)

    基础:
    1.三个生产者 A,B,C,2个分区
    2.消息是K,V的,同类的k消息是在同一个分区内
    3.多种消息K是存在同一分区内
    
    保证有序
    0.需要给消息设置key
    1.生产者要先保证有序,既A生产a,B生产b,不能A生产b,这样的话不能保证数据b有序的到达分区内 2.同一个key在同一分区内是有序的,存在不同key之间交叉。既把需要排序的数据设置同一个key 3.consumer拉取数据

    consumer拉取和推送对比

    0.kafka使用的是拉取
    1.推送:可能server消费能力跟不上kafka的推送能力,所以kafka需要维护一个状态,用来保存server发来自己能否还能消费的状态
    2.拉取:自主,按需,去订阅去拉取server的数据,拉取是批量拉取

    offset怎么维护

    1.单线程,一条一条更新offset
    2.多线程,一次处理拉过来的一批数据(5) + 事务,如果成功就+5,如果失败就回滚

    代码:

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.4.1</version>
            </dependency>
    package com.xiaoke;
    
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.junit.Test;
    
    
    import java.time.Duration;
    import java.util.*;
    import java.util.concurrent.Future;
    
    public class KafkaDemo {
    
    
        @Test
        public void produce() throws Exception {
            String topic = "604-items";
            Properties p = new Properties();
            p.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ke01:9092,ke02:9092,ke03:9092");
            //kafka  持久化数据的MQ  数据-> byte[],不会对数据进行干预,双方要约定编解码
            //kafka是一个app::使用零拷贝  sendfile 系统调用实现快速数据消费
            p.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            p.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            p.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(p);
    
            //现在的producer就是一个提供者,面向的其实是broker,虽然在使用的时候我们期望把数据打入topic
    
            /*
            topic:xjt_items
            2partition
            三种商品,每种商品有线性的3个ID
    
            验证:1.保证顺序 2.相同的商品最好去到一个分区里
             */
    
            while (true) {
                for (int i = 0; i < 3; i++) {
                    for (int j = 0; j < 3; j++) {
                        ProducerRecord<String, String> record = new ProducerRecord(topic, "item" + j, "val" + i);
                        Future<RecordMetadata> send = producer
                                .send(record);
    
                        RecordMetadata rm = send.get();
                        int partition = rm.partition();
                        long offset = rm.offset();
                        System.out.println("key: " + record.key() + " val: " + record.value() + " partition: " + partition + " offset: " + offset);
    
                    }
                }
            }
            /**
             * 1.县创建topic: kafka-topics.sh --zookeeper ke03:2181/kafka  --create --topic 604-items  --partitions 2 --replication-factor 2
             * 2.生产数据
             *
             * 结果:
             key: item0 val: val0 partition: 1 offset: 0
             key: item1 val: val0 partition: 0 offset: 0
             key: item2 val: val0 partition: 1 offset: 1
             key: item0 val: val1 partition: 1 offset: 2
             key: item1 val: val1 partition: 0 offset: 1
             key: item2 val: val1 partition: 1 offset: 3
             key: item0 val: val2 partition: 1 offset: 4
             key: item1 val: val2 partition: 0 offset: 2
             key: item2 val: val2 partition: 1 offset: 5
             key: item0 val: val0 partition: 1 offset: 6
             *
             * 整理:
             * key: item1 val: val0 partition: 0 offset: 0
             * key: item1 val: val1 partition: 0 offset: 1
             * key: item1 val: val2 partition: 0 offset: 2
             *
             *
             * key: item0 val: val0 partition: 1 offset: 0
             * key: item2 val: val0 partition: 1 offset: 1
             * key: item0 val: val1 partition: 1 offset: 2
             * key: item2 val: val1 partition: 1 offset: 3
             * key: item0 val: val2 partition: 1 offset: 4
             * key: item2 val: val2 partition: 1 offset: 5
             * key: item0 val: val0 partition: 1 offset: 6
             * 结论:
             * 1.相同的产品在同一个分区内
             * 2.相同的产品在同一个分区内是有序的
             * 3.offset是分区粒度的
             *
             */
        }
    
    
        @Test
        public void consumer() {
            String topic = "604-items";
            Properties p = new Properties();
            p.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ke01:9092,ke02:9092,ke03:9092");
            p.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            p.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            //设置消费组
            p.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group04");
    
            /**
             *         "What to do when there is no initial offset in Kafka or if the current offset
             *         does not exist any more on the server  第一次启动设置offset可以用一下参数
             *         (e.g. because that data has been deleted):
             *         <ul>
             *             <li>earliest: automatically reset the offset to the earliest offset 取当前的
             *             <li>latest: automatically reset the offset to the latest offset</li> 取之前的
             *             <li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li> 没有offset就报错
             *             <li>anything else: throw exception to the consumer.</li>
             *         </ul>";
             */
            p.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//
    
            /**
             * 0.true 自动提交offset false手动提交offset
             * 1.自动提交时异步提交,丢数据&&重复数据
             * 2.一旦你自动提交,但是是异步的
             *         //1,还没到时间,挂了,没提交,重起一个consuemr,参照offset的时候,会重复消费
             *         //2,一个批次的数据还没写数据库成功,但是这个批次的offset背异步提交了,挂了,重起一个consuemr,参照offset的时候,会丢失数据
             */
            p.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    
            p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "15000");//5秒
    //        p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,""); // POLL 拉取数据,弹性,按需,拉取多少?
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer(p);
            //kafka 的consumer会动态负载均衡
            consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    System.out.println("---onPartitionsRevoked:");
                    Iterator<TopicPartition> iter = partitions.iterator();
                    while (iter.hasNext()) {
                        System.out.println(iter.next().partition());
                    }
    
                }
    
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    System.out.println("---onPartitionsAssigned:");
                    Iterator<TopicPartition> iter = partitions.iterator();
    
                    while (iter.hasNext()) {
                        System.out.println(iter.next().partition());
                    }
    
                }
            });
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0L));// 0~n
                if (!records.isEmpty()) {
                    System.out.println("-----------" + records.count() + "-------------");
                    Set<TopicPartition> partitions = records.partitions(); //每次poll的时候是取多个分区的数据
                    for (TopicPartition partition : partitions) {
                        List<ConsumerRecord<String, String>> pRecords = records.records(partition);
                        Iterator<ConsumerRecord<String, String>> piter = pRecords.iterator();
                        while (piter.hasNext()) {
                            ConsumerRecord<String, String> next = piter.next();
                            int par = next.partition();
                            long offset = next.offset();
                            String key = next.key();
                            String value = next.value();
                            long timestamp = next.timestamp();
                            System.out.println("key: " + key + " val: " + value + " partition: " + par + " offset: " + offset + "time:: " + timestamp);
                            //单线程,多线程,都可以
    
                            /**
                             * 方式一:单条记录手动提交offset
                             */
                            TopicPartition sp = new TopicPartition("msb-items", par);
                            OffsetAndMetadata om = new OffsetAndMetadata(offset);
                            HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
                            map.put(sp, om);
                            consumer.commitSync(map);
    
                        }
                        /**
                         * 方式二:,分区粒度手动提交offset
                         */
                        long poff = pRecords.get(pRecords.size() - 1).offset();//获取分区内最后一条消息的offset
                        OffsetAndMetadata pom = new OffsetAndMetadata(poff);
                        HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
                        map.put(partition, pom);
                        consumer.commitSync(map);
    
                    }
                    /**
                     * 方式三:按poll的批次提交offset,第3点
                     */
                    consumer.commitSync();
                }
            }
    
    
        }
    
    
        /**
         * 验证earliest和latest区别:
         * 1. a.设置自动条件为15秒 b.group01 earliest启动 c.group02 latest启动
         *
         *  kafka-consumer-groups.sh --bootstrap-server ke03:9092 --describe --group group01 15秒内结果:
         *  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST            CLIENT-ID
         * 604-items       0          -               26              -               consumer-group03-1-857a5f24-fce6-4240-b73e-b9b5dc079553 /192.168.244.1  consumer-group01-1
         * 604-items       1          -               53              -               consumer-group03-1-857a5f24-fce6-4240-b73e-b9b5dc079553 /192.168.244.1  consumer-group01-1
         *
         * 15秒后结果:
         * TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST            CLIENT-ID
         * 604-items       0          26              26              0               consumer-group03-1-857a5f24-fce6-4240-b73e-b9b5dc079553 /192.168.244.1  consumer-group01-1
         * 604-items       1          53              53              0               consumer-group03-1-857a5f24-fce6-4240-b73e-b9b5dc079553 /192.168.244.1  consumer-group01-1
         *
         *
         *kafka-consumer-groups.sh --bootstrap-server ke03:9092 --describe --group group02 15秒内结果: 并没有消费之前的数据
         * TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST            CLIENT-ID
         * 604-items       0          26              26              0               consumer-group04-1-08c34055-ba99-4a66-a6e2-a7a155ab2e63 /192.168.244.1  consumer-group02-1
         * 604-items       1          53              53              0               consumer-group04-1-08c34055-ba99-4a66-a6e2-a7a155ab2e63 /192.168.244.1  consumer-group02-1
         *
         * 结论:earliest第一次启动会从0开始消费数据  latest第一次启动会从当前consumer启动后消费数据,不会消费启动之前的数据
         */
    
    
    }
  • 相关阅读:
    字符串与指针{学习笔记}
    selenium 测试脚本
    多维数组与指针{笔记}
    SQL注入常用语句{笔记}
    C语言指针{学习笔记}
    字符编码
    移动窗体
    TreeView树形结构
    未能找到元数据文件解决办法
    gridview分页的问题
  • 原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/15059209.html
Copyright © 2011-2022 走看看