目录:
kafka启动脚本以及关闭脚本
1. 同一个生产者同一个Topic,两个相同的消费者相同的Group
2. 同一个生产者同一个Topic,两个消费者不同Group
3. 两个生产者同一个Topic,生产不同的消息,一个消费者
运行的前提是有kafka,并启动kafka,这里我写了个kafka启动脚本:
#!/bin/sh #创建启动脚本 #启动zookeeper /user/kafka_2.11-2.0.0/bin/zookeeper-server-start.sh /user/kafka_2.11-2.0.0/config/zookeeper.properties & sleep 3 #等3秒后执行 #启动kafka /user/kafka_2.11-2.0.0/bin/kafka-server-start.sh /user/kafka_2.11-2.0.0/config/server.properties &
kafka关闭脚本:
#!/bin/sh #创建关闭脚本 #关闭kafka /user/kafka_2.11-2.0.0/bin/kafka-server-stop.sh /user/kafka_2.11-2.0.0/config/server.properties & sleep 3 #等3秒后执行 #关闭zookeeper /user/kafka_2.11-2.0.0/bin/zookeeper-server-stop.sh /user/kafka_2.11-2.0.0/config/zookeeper.properties &
1. 同一个生产者同一个Topic,两个相同的消费者相同的Group
新建一个生产者TestKafkaProducer,需要引入kafka的lib中的jar包,主要包括两个类,如下所示:
kafka生产者:
package com.zc.kafka.producer.main; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; /** * Kafka生产者 * 先启动生产者,发送消息到broker,这里简单发送了10条从0-9的消息,再启动消费者,控制台输出如下: */ public class SimpleKafkaProducer { private static long i = 0; public void send(String str) { // TODO Auto-generated method stub Properties props = new Properties(); //broker地址 props.put("bootstrap.servers", "localhost:9092"); //请求时候需要验证 props.put("acks", "all"); //请求失败时候需要重试 props.put("retries", 0); //内存缓存区大小 props.put("buffer.memory", 33554432); //指定消息key序列化方式 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //指定消息本身的序列化方式 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); //for (int i = 0; i < 10; i++) { //i < 10 // 生产一条消息的时间有点长 //producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i))); //System.out.println(i); //} // 这里的“test“是topic producer.send(new ProducerRecord<>("test", String.valueOf(i), str)); i++; System.out.println("Message sent successfully"); producer.close(); } }
生产数据:
package com.zc.kafka.producer.test; import com.zc.kafka.producer.main.SimpleKafkaProducer; public class TestSimpleKafkaProducer { public static void main(String[] args) { // TODO Auto-generated method stub long i=0; SimpleKafkaProducer skp = new SimpleKafkaProducer(); while(true) { skp.send("Hello: "+ String.valueOf(i)); i++; try { Thread.sleep(10000); //ms } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
新建两个消费者,引入kafka中lib中的jar包,分别是TestKafkaConsumer和TestKafkaConsumer2,他们有一个相同的类,如下所示:
消息消费者:
package com.zc.kafka.consumer.main; import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; /** * kafka消费者 */ public class SimpleKafkaConsumer { @SuppressWarnings({ "deprecation", "resource" }) public static void main(String[] args) { // TODO Auto-generated method stub Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); //每个消费者分配独立的组号,这里的“test”是group props.put("group.id", "test"); //如果value合法,则自动提交偏移量 props.put("enable.auto.commit", "true"); //设置多久一次更新被消费消息的偏移量 props.put("auto.commit.interval.ms", "1000"); //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息 props.put("session.timeout.ms", "30000"); // //props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test")); //核心函数1:订阅topic System.out.println("Subscribed to topic " + "test"); //int i = 0; while (true) { //System.out.println(i++); //核心函数2:long poll,一次拉取回来多个消息 /* 读取数据,读取超时时间为100ms */ ConsumerRecords<String, String> records = consumer.poll(100); //System.out.println(records.count()); for (ConsumerRecord<String, String> record : records) // print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s ", record.offset(), record.key(), record.value()); } } }
启动生产者,并启动两个消费者。(我的生产者和两个消费者都在同一主机上)
结果是:
第一个启动的消费者消费消息,第二个消费者没有消费消息;我关闭掉第一个消费者,第二个消费者就会消费消息; (因为Group相同)
同时只会有一个消费者在消费消息,并且消费消息没有重叠。
消费者1:
Subscribed to topic test offset = 4451, key = 25, value = Hello: 25 offset = 4452, key = 26, value = Hello: 26 offset = 4453, key = 27, value = Hello: 27 offset = 4454, key = 28, value = Hello: 28 offset = 4455, key = 29, value = Hello: 29 offset = 4456, key = 30, value = Hello: 30 offset = 4457, key = 31, value = Hello: 31 offset = 4458, key = 32, value = Hello: 32
消费者2:
Subscribed to topic test offset = 4459, key = 33, value = Hello: 33 offset = 4460, key = 34, value = Hello: 34 offset = 4461, key = 35, value = Hello: 35 offset = 4462, key = 36, value = Hello: 36
2. 同一个生产者同一个Topic,两个消费者不同Group
这里只是修改了TestKafkaConsumer2的源码,修改了组,具体如下所示:
package com.zc.kafka.consumer.main; import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; /** * kafka消费者 */ public class SimpleKafkaConsumer { @SuppressWarnings({ "deprecation", "resource" }) public static void main(String[] args) { // TODO Auto-generated method stub Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); //每个消费者分配独立的组号 props.put("group.id", "Consumer2"); //修改了组 //如果value合法,则自动提交偏移量 props.put("enable.auto.commit", "true"); //设置多久一次更新被消费消息的偏移量 props.put("auto.commit.interval.ms", "1000"); //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息 props.put("session.timeout.ms", "30000"); // //props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test")); //核心函数1:订阅topic System.out.println("Subscribed to topic " + "test"); //int i = 0; while (true) { //System.out.println(i++); //核心函数2:long poll,一次拉取回来多个消息 /* 读取数据,读取超时时间为100ms */ ConsumerRecords<String, String> records = consumer.poll(100); //System.out.println(records.count()); for (ConsumerRecord<String, String> record : records) // print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s ", record.offset(), record.key(), record.value()); } } }
启动生产者,并启动两个消费者。(我的生产者和两个消费者都在同一主机上)
结果是:
第一个启动的消费者消费消息,第二个消费者也再消费消息;(因为Group不相同)
同时两个消费者都在消费消息,并且消费消息重叠。
消费者1:
Subscribed to topic test offset = 4463, key = 0, value = Hello: 0 offset = 4464, key = 1, value = Hello: 1 offset = 4465, key = 2, value = Hello: 2 offset = 4466, key = 3, value = Hello: 3 offset = 4467, key = 4, value = Hello: 4 offset = 4468, key = 5, value = Hello: 5 offset = 4469, key = 6, value = Hello: 6 offset = 4470, key = 7, value = Hello: 7 offset = 4471, key = 8, value = Hello: 8 offset = 4472, key = 9, value = Hello: 9
消费者2:
Subscribed to topic test offset = 4466, key = 3, value = Hello: 3 offset = 4467, key = 4, value = Hello: 4 offset = 4468, key = 5, value = Hello: 5 offset = 4469, key = 6, value = Hello: 6 offset = 4470, key = 7, value = Hello: 7 offset = 4471, key = 8, value = Hello: 8 offset = 4472, key = 9, value = Hello: 9
3. 两个生产者同一个Topic,生产不同的消息,一个消费者
生产的数据和第一个生产者不同:
package com.zc.kafka.producer.test; import com.zc.kafka.producer.main.SimpleKafkaProducer; public class TestSimpleKafkaProducer2 { public static void main(String[] args) { // TODO Auto-generated method stub long i=0; SimpleKafkaProducer skp = new SimpleKafkaProducer(); while(true) { skp.send("Kafka: "+ String.valueOf(i)); //生产的数据不同 i++; try { Thread.sleep(10000); //ms } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
启动两个生产者,并启动消费者。(我的生产者和消费者都在同一主机上)
结果是:
消费者同时收到了两个生产者的消息; (因为Topic相同)
消费者:
Subscribed to topic test offset = 4473, key = 0, value = Hello: 0 offset = 4474, key = 0, value = Kafka: 0 offset = 4475, key = 1, value = Hello: 1 offset = 4476, key = 1, value = Kafka: 1 offset = 4477, key = 2, value = Hello: 2 offset = 4478, key = 2, value = Kafka: 2 offset = 4479, key = 3, value = Hello: 3 offset = 4480, key = 3, value = Kafka: 3 offset = 4481, key = 4, value = Hello: 4 offset = 4482, key = 4, value = Kafka: 4