- ke01 ke02 ke03
#选主 broker.id=1 #监听端口 listeners=PLAINTEXT://ke01:9092 #日志地址 log.dirs=/var/kafka_data #zk连接信息 zookeeper.connect=ke02:2181,ke03:2181,ke04:2181/kafka
listeners 修改为对应机器ip
kafka-server-start.sh ./server.properties(三台都运行)
1.zk目录中有kafka目录 2.get /kafka/controller 可以看出当前主机是brokerid=1的 {"version":1,"brokerid":1,"timestamp":"1627232194038"} 3.ls /kafka/brokers/ids 查看有哪些brokers [1, 2, 3] 4.ls /kafka/brokers/topics 查看有哪些topic []
kafka-topics.sh --zookeeper ke02:2181,ke03:2181/kafka --create --topic ooxx --partitions 2 --replication-factor 2
[root@ke03 ~]# kafka-topics.sh --zookeeper ke02:2181,ke03:2181/kafka --list ooxx
[root@ke03 ~]# kafka-topics.sh --zookeeper ke02:2181,ke03:2181/kafka --describe --topic ooxx Topic:ooxx PartitionCount:2 ReplicationFactor:2 Configs: Topic: ooxx Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2 Topic: ooxx Partition: 1 Leader: 1 Replicas: 1,3 Isr: 1,3 说明: 1.有两个分区,两个副本 2. 0号分区在brokers=3上 3. 1号分区在brokers=1上
场景一: 启动一个消费者 1.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group xiaoke 2.启动生产者:kafka-console-producer.sh --broker-list ke02:9092 --topic ooxx 3.写入数据:a1,a2,a3 4. 一个消费者:a1,a2,a3 消费了全部数据 结论:生产者生产数据往p-0,p-1分区发数据, 一个consumer拿到了所有分区内的数据 场景二:启动二个消费者(在同一个组内) 1.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group xiaoke 2.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group xiaoke 3.启动生产者:kafka-console-producer.sh --broker-list ke02:9092 --topic ooxx 4.写入数据:b1,b2,b3,b4 5.消费者1:b1,b3 6.消费者2:b2,b4 结论: 生产者生产数据往p-0,p-1分区发数据,两个consumer轮询平分了所有分区内的数据,既:1个分区对应一个consumer 场景三: 启动二个消费者,每个组内一个消费者 1.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group g1 2.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group g2 3.启动生产者:kafka-console-producer.sh --broker-list ke02:9092 --topic ooxx 4.写入数据:c1,c2,c3,c4 5.消费者1:c1,c2,c3,c4 6.消费者2:c1,c2,c3,c4 结论:数据的重复利用是站在Group上的
1.kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx 当消费者连接没有给出组时,kafka默认分配组,每次连接都会分配一个新的 2.查看所有组 [root@ke03 ~]# kafka-consumer-groups.sh --bootstrap-server ke02:9092 --list xiaoke console-consumer-69705 console-consumer-67386 3.查看组详细 [root@ke03 ~]# kafka-consumer-groups.sh --bootstrap-server ke03:9092 --describe --group xiaoke TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID ooxx 0 6 6 0 consumer-1-5e729f67-d0f7-47b2-a7c4-42b4024634ee / consumer-1 ooxx 1 5 5 0 consumer-1-5e729f67-d0f7-47b2-a7c4-42b4024634ee / consumer-1
基础: 1.三个生产者 A,B,C,2个分区 2.消息是K,V的,同类的k消息是在同一个分区内 3.多种消息K是存在同一分区内 保证有序
0.需要给消息设置key 1.生产者要先保证有序,既A生产a,B生产b,不能A生产b,这样的话不能保证数据b有序的到达分区内 2.同一个key在同一分区内是有序的,存在不同key之间交叉。既把需要排序的数据设置同一个key 3.consumer拉取数据
0.kafka使用的是拉取 1.推送:可能server消费能力跟不上kafka的推送能力,所以kafka需要维护一个状态,用来保存server发来自己能否还能消费的状态 2.拉取:自主,按需,去订阅去拉取server的数据,拉取是批量拉取
1.单线程,一条一条更新offset 2.多线程,一次处理拉过来的一批数据(5) + 事务,如果成功就+5,如果失败就回滚
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency>
package com.xiaoke; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.Test; import java.time.Duration; import java.util.*; import java.util.concurrent.Future; public class KafkaDemo { @Test public void produce() throws Exception { String topic = "604-items"; Properties p = new Properties(); p.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ke01:9092,ke02:9092,ke03:9092"); //kafka 持久化数据的MQ 数据-> byte[],不会对数据进行干预,双方要约定编解码 //kafka是一个app::使用零拷贝 sendfile 系统调用实现快速数据消费 p.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); p.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); p.setProperty(ProducerConfig.ACKS_CONFIG, "-1"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(p); //现在的producer就是一个提供者,面向的其实是broker,虽然在使用的时候我们期望把数据打入topic /* topic:xjt_items 2partition 三种商品,每种商品有线性的3个ID 验证:1.保证顺序 2.相同的商品最好去到一个分区里 */ while (true) { for (int i = 0; i < 3; i++) { for (int j = 0; j < 3; j++) { ProducerRecord<String, String> record = new ProducerRecord(topic, "item" + j, "val" + i); Future<RecordMetadata> send = producer .send(record); RecordMetadata rm = send.get(); int partition = rm.partition(); long offset = rm.offset(); System.out.println("key: " + record.key() + " val: " + record.value() + " partition: " + partition + " offset: " + offset); } } } /** * 1.县创建topic: kafka-topics.sh --zookeeper ke03:2181/kafka --create --topic 604-items --partitions 2 --replication-factor 2 * 2.生产数据 * * 结果: key: item0 val: val0 partition: 1 offset: 0 key: item1 val: val0 partition: 0 offset: 0 key: item2 val: val0 partition: 1 offset: 1 key: item0 val: val1 partition: 1 offset: 2 key: item1 val: val1 partition: 0 offset: 1 key: item2 val: val1 partition: 1 offset: 3 key: item0 val: val2 partition: 1 offset: 4 key: item1 val: val2 partition: 0 offset: 2 key: item2 val: val2 partition: 1 offset: 5 key: item0 val: val0 partition: 1 offset: 6 * * 整理: * key: item1 val: val0 partition: 0 offset: 0 * key: item1 val: val1 partition: 0 offset: 1 * key: item1 val: val2 partition: 0 offset: 2 * * * key: item0 val: val0 partition: 1 offset: 0 * key: item2 val: val0 partition: 1 offset: 1 * key: item0 val: val1 partition: 1 offset: 2 * key: item2 val: val1 partition: 1 offset: 3 * key: item0 val: val2 partition: 1 offset: 4 * key: item2 val: val2 partition: 1 offset: 5 * key: item0 val: val0 partition: 1 offset: 6 * 结论: * 1.相同的产品在同一个分区内 * 2.相同的产品在同一个分区内是有序的 * 3.offset是分区粒度的 * */ } @Test public void consumer() { String topic = "604-items"; Properties p = new Properties(); p.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ke01:9092,ke02:9092,ke03:9092"); p.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); p.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //设置消费组 p.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group04"); /** * "What to do when there is no initial offset in Kafka or if the current offset * does not exist any more on the server 第一次启动设置offset可以用一下参数 * (e.g. because that data has been deleted): * <ul> * <li>earliest: automatically reset the offset to the earliest offset 取当前的 * <li>latest: automatically reset the offset to the latest offset</li> 取之前的 * <li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li> 没有offset就报错 * <li>anything else: throw exception to the consumer.</li> * </ul>"; */ p.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// /** * 0.true 自动提交offset false手动提交offset * 1.自动提交时异步提交,丢数据&&重复数据 * 2.一旦你自动提交,但是是异步的 * //1,还没到时间,挂了,没提交,重起一个consuemr,参照offset的时候,会重复消费 * //2,一个批次的数据还没写数据库成功,但是这个批次的offset背异步提交了,挂了,重起一个consuemr,参照offset的时候,会丢失数据 */ p.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "15000");//5秒 // p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,""); // POLL 拉取数据,弹性,按需,拉取多少? KafkaConsumer<String, String> consumer = new KafkaConsumer(p); //kafka 的consumer会动态负载均衡 consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println("---onPartitionsRevoked:"); Iterator<TopicPartition> iter = partitions.iterator(); while (iter.hasNext()) { System.out.println(iter.next().partition()); } } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { System.out.println("---onPartitionsAssigned:"); Iterator<TopicPartition> iter = partitions.iterator(); while (iter.hasNext()) { System.out.println(iter.next().partition()); } } }); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0L));// 0~n if (!records.isEmpty()) { System.out.println("-----------" + records.count() + "-------------"); Set<TopicPartition> partitions = records.partitions(); //每次poll的时候是取多个分区的数据 for (TopicPartition partition : partitions) { List<ConsumerRecord<String, String>> pRecords = records.records(partition); Iterator<ConsumerRecord<String, String>> piter = pRecords.iterator(); while (piter.hasNext()) { ConsumerRecord<String, String> next = piter.next(); int par = next.partition(); long offset = next.offset(); String key = next.key(); String value = next.value(); long timestamp = next.timestamp(); System.out.println("key: " + key + " val: " + value + " partition: " + par + " offset: " + offset + "time:: " + timestamp); //单线程,多线程,都可以 /** * 方式一:单条记录手动提交offset */ TopicPartition sp = new TopicPartition("msb-items", par); OffsetAndMetadata om = new OffsetAndMetadata(offset); HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>(); map.put(sp, om); consumer.commitSync(map); } /** * 方式二:,分区粒度手动提交offset */ long poff = pRecords.get(pRecords.size() - 1).offset();//获取分区内最后一条消息的offset OffsetAndMetadata pom = new OffsetAndMetadata(poff); HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>(); map.put(partition, pom); consumer.commitSync(map); } /** * 方式三:按poll的批次提交offset,第3点 */ consumer.commitSync(); } } } /** * 验证earliest和latest区别: * 1. a.设置自动条件为15秒 b.group01 earliest启动 c.group02 latest启动 * * kafka-consumer-groups.sh --bootstrap-server ke03:9092 --describe --group group01 15秒内结果: * TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID * 604-items 0 - 26 - consumer-group03-1-857a5f24-fce6-4240-b73e-b9b5dc079553 / consumer-group01-1 * 604-items 1 - 53 - consumer-group03-1-857a5f24-fce6-4240-b73e-b9b5dc079553 / consumer-group01-1 * * 15秒后结果: * TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID * 604-items 0 26 26 0 consumer-group03-1-857a5f24-fce6-4240-b73e-b9b5dc079553 / consumer-group01-1 * 604-items 1 53 53 0 consumer-group03-1-857a5f24-fce6-4240-b73e-b9b5dc079553 / consumer-group01-1 * * *kafka-consumer-groups.sh --bootstrap-server ke03:9092 --describe --group group02 15秒内结果: 并没有消费之前的数据 * TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID * 604-items 0 26 26 0 consumer-group04-1-08c34055-ba99-4a66-a6e2-a7a155ab2e63 / consumer-group02-1 * 604-items 1 53 53 0 consumer-group04-1-08c34055-ba99-4a66-a6e2-a7a155ab2e63 / consumer-group02-1 * * 结论:earliest第一次启动会从0开始消费数据 latest第一次启动会从当前consumer启动后消费数据,不会消费启动之前的数据 */ }