创建项目
配置pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>
生产者
package com.itheima.producer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
/**
* kafka客户端之:生产者
*/
public class MyKafkaProducer {
public static void main(String[] args) throws Exception{
// 1.配置信息
Properties props = new Properties();
// 定义kafka服务器地址列表,不需要指定所有的broker
props.put("bootstrap.servers", "server1:9092,server2:9092,server3:9092");
// 生产者需要leader确认请求完成之前接收的应答数
props.put("acks", "-1");
// 客户端失败重试次数
props.put("retries", 1);
// 生产者打包消息的批量大小,以字节为单位.此处是16k
props.put("batch.size", 16384);
// 生产者延迟1ms发送消息
props.put("linger.ms", 1);
// 生产者缓存内存的大小,以字节为单位.此处是32m
props.put("buffer.memory", 33554432);
// key 序列化类
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化类
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2.创建生产者
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);
// 3.生产数据
/**
* 发送消息的三种方式:
* 1.同步阻塞发送
* 适用场景:业务不需要高吞吐量、更关心消息发送的顺序、不允许消息发送失败
* 2.异步发送(发送并忘记)
* 适用场景:业务只关心吞吐量、不关心消息发送的顺序、可以允许消息发送失败
* 3.异步发送(回调函数)
* 适用场景:业务需要知道消息发送成功、不关心消息发送的顺序
*/
// 1.同步阻塞发送
// 创建消息
/* System.out.println("-------------------同步发送消息......start-----------------------");
ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima_topic",0,"key-sync","同步发送消息");
Future<RecordMetadata> send = producer.send(record);
RecordMetadata recordMetadata = send.get();
System.out.println(recordMetadata);//itheima_topic-0@2
System.out.println("-------------------同步发送消息......end-----------------------");*/
// 2.异步发送(发送并忘记)
// 创建消息
/*System.out.println("-------------------异步发送(发送并忘记)......start-----------------------");
ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima_topic",0,"key-async1","异步发送消息,发送并忘记");
// 发送并忘记
producer.send(record);
System.out.println("-------------------异步发送(发送并忘记)......end-----------------------");
// 刷新
producer.flush();*/
// 3.异步发送(回调函数)
// 创建消息
System.out.println("-------------------异步发送(回调函数)......start-----------------------");
ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima_topic",0,"key-async2","异步发送消息,(回调函数)");
// 发送,回调函数处理
producer.send(record, new Callback() {
// 处理回调业务逻辑
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("异步发送消息成功:"+recordMetadata);//itheima_topic-0@4
System.out.println("异常对象:"+e);//null
}
});
System.out.println("-------------------异步发送(回调函数)......end-----------------------");
// 刷新
producer.flush();
}
}
消费者
package com.itheima.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Properties;
/**
* kafka客户端之:消费者
*/
public class MyKafkaConsumer {
public static void main(String[] args) throws Exception{
// 1.配置信息
Properties props = new Properties();
// 定义kafka服务器地址列表,不需要指定所有的broker
props.put("bootstrap.servers", "server1:9092,server2:9092,server3:9092");
// 消费者组id
props.put("group.id", "itheima");
// 是否自动确认offset
props.put("enable.auto.commit", "true");
//自动确认offset时间间隔
props.put("auto.commit.interval.ms", "1000");
// key 序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value序列化类
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 2.创建消费者
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
// 3.消费消息
// 指定分区消费
TopicPartition partition = new TopicPartition("itheima_topic",0);
// 获取已经提交的偏移量
long offset = 0L;
OffsetAndMetadata offsetAndMetadata = consumer.committed(partition);
if(offsetAndMetadata !=null){
offset = offsetAndMetadata.offset();
}
System.out.println("当前消费的偏移量:"+offset);
// 指定偏移量消费
consumer.assign(Arrays.asList(partition));
consumer.seek(partition,offset);
// 循环拉取数据
while (true){
// 拉取数据
ConsumerRecords<String, String> records = consumer.poll(1000);
// 打印数据
for (ConsumerRecord<String, String> record : records) {
System.out.println("消费的数据为:" + record.value());
}
}
}
}