1.引入依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
2.producer代码
import com.sakura.bean.User;
import kafka.utils.Json;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class KafkaProducerDemo {
//创建一个kafka生产者
private final KafkaProducer<String, String> producer;
//创建一个topic
private final String topic;
//对kafka进行初始化设置
public KafkaProducerDemo(String topic) {
Properties properties = new Properties();
//broker的地址
properties.put("bootstrap.servers", "192.168.204.139:9092");
//clientId
properties.put("client.id", "producer-demo");
//序列化配置
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
this.producer = new KafkaProducer<String, String>(properties);
this.topic = topic;
}
public void sendMsg(Object message, boolean syncSend) {
if (syncSend) {
producer.send(new ProducerRecord<String, String>(topic, Json.encodeAsString(message)));
} else {
producer.send(new ProducerRecord<String, String>(topic, Json.encodeAsString(message)), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.err.println("unable to write to Kafka in KafkaProducerDemo[" + topic + "]exception:" + e);
}
}
});
}
}
public void close() {
producer.close();
}
public static void main(String[] args) throws InterruptedException {
String topic = "firstTopic";
KafkaProducerDemo kafkaProducerDemo = new KafkaProducerDemo(topic);
//send message
for (int i = 0; i < 6; i++) {
User user = new User(i, "user" + i, (byte) i);
kafkaProducerDemo.sendMsg(user, false);
}
kafkaProducerDemo.close();
Thread.sleep(Integer.MAX_VALUE);
}
}
3.consumer代码
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
public class KafkaConsumerDemo {
//创建消费者
private final KafkaConsumer<String, String> consumer;
public KafkaConsumerDemo(String topic) {
Properties properties = new Properties();
//zookeeper
properties.put("zookeeper.connect", "192.168.204.140:2181");
//broker
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.204.139:9092");
//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group-id");
//是否自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
//如果没有设置offset或者设置的offset不存在时(例如数据被删除)采取的策略:
//earliest:使用最早的offset
//latest:使用最新的offset
//none:使用前一个offset,如果没有就向consumer抛异常
//anything else:直接向consumer抛出异常
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//超时时间设置.单位毫秒
//当使用了consumerGroup时,consumer会向broker发送心跳检测,如果在设置的超时时间内broker未接收到该心跳检测,
//则认为consumer不可用,将其从consumerGroup中移除并重新做负载。
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
//序列化设置
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(properties);
//指定消费的topic
//订阅主题,消费消息
consumer.subscribe(Collections.singletonList(topic));
}
//消费消息
public void receiveMsg(KafkaConsumer<String, String> consumer) {
while (true) {
//从指定的topic上拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
//循环topic下的每一个partition
for (TopicPartition partition :
records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
String topic = partition.topic();
int size = partitionRecords.size();
System.out.println("topic:" + topic + ",分区:" + partition.partition() + ",消息总数:" + size);
//循环partition上的消息
for (ConsumerRecord<String, String> stringStringConsumerRecord : partitionRecords) {
System.out.println("value:" + stringStringConsumerRecord.value());
long offSet = stringStringConsumerRecord.offset() + 1;
//下面这种方式会自动设置topic和offset
// consumer.commitSync();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offSet)));
System.out.println("同步成功,topic:" + topic + ",offSet:" + offSet);
}
}
}
}
public static void main(String[] args) {
String topic = "firstTopic";
KafkaConsumerDemo kafkaConsumerDemo = new KafkaConsumerDemo(topic);
kafkaConsumerDemo.receiveMsg(kafkaConsumerDemo.consumer);
}
}