1.消息发送流程
kafka的producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到kafka broker。
2.异步发送API
2.1 导入依赖
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
2.2 普通生产者
不带回调函数,其中像端口号等配置项都封装在了ProducerConfig这个类中,也可以使用
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG 这种方式去设置属性
2.2.1 编写代码
package com.wn.Test01;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class MyProducer {
public static void main(String[] args){
//创建kafka生产者的配置信息
Properties properties = new Properties();
//kafka集群
properties.put("bootstrap.servers","192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
//ack应答级别
properties.put("acks","all");
//重试次数
properties.put("retries",3);
//批次大小 16K
properties.put("batch.size",16384);
//等待时间
properties.put("linger.ms",1);
//RecordAccumulator缓冲区大小 32M
properties.put("buffer.memory",33554432);
//key,value序列化类
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//创建生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
//发送数据
for (int i=0;i<10;i++){
producer.send(new ProducerRecord<String, String>("wang","wnwn--"+i));
}
//关闭资源
producer.close();
}
}
2.2.2 启动zookeeper和kafka
2.2.3 创建消费者
bin/kafka-console-consumer.sh --zookeeper 192.168.138.55:2181,192.168.138.62181,192.168.138.77:2181 --topic wang
2.2.4 执行方法,查看接收数据
2.3 带回调函数的生产者
在生产者send时,设置Callback对象,并重写里面的onCompletion方法,回调函数
2.3.1 编写测试代码
package com.wn.Test01;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
/*带回调函数的生产者*/
//在生产者send时,设置Callback对象,并重写里面的onCompletion方法,回调函数
public class CallBackProducer {
public static void main(String[] args){
//创建配置信息
Properties properties = new Properties();
//kafka服务端的主机名和端口号
properties.put("bootstrap.servers","192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
//ack应答级别
properties.put("acks","all");
//重试次数
properties.put("retries",3);
//一批消息处理大小
properties.put("batch.size",16384);
//请求延迟
properties.put("linger.ms",1);
//发送缓存区内存大小
properties.put("buffer.memory",33554432);
//key,value序列化类
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i=0;i<50;i++){
producer.send(new ProducerRecord<String, String>("wang", Integer.toString(i), "hello word-" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (recordMetadata!=null){
System.out.println(recordMetadata.partition()+"---"+recordMetadata.offset());
}
}
});
}
//关闭资源
producer.close();
}
}
2.3.2 启动zookeeper和kafka
2.3.3 创建消费者
bin/kafka-console-consumer.sh --zookeeper 192.168.138.55:2181,192.168.138.66:2181,192.168.138.77:2181 --topic wang
2.3.4 执行方法,查看结果
2.4 自定义分区的生产者
2.4.1 创建自定义分区
package com.wn.Test01;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/*自定义分区*/
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes1, Cluster cluster) {
//Integer integer = cluster.partitionCountForTopic(topic);
//return key.toString().hashCode()%integer;
return 1;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
2.4.2 创建自定义分区的生产者
package com.wn.Test01;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
/*自定义分区的生产者*/
public class PartitionProducer {
public static void main(String[] args){
//创建kafka生产者的配置信息
Properties properties = new Properties();
//kafka集群 ProducerConfig
properties.put("bootstrap.servers","192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
//ack应答级别
properties.put("acks","all");
//重试次数
properties.put("retries",3);
//批次大小 16K
properties.put("batch.size",16384);
//等待时间
properties.put("linger.ms",1);
//RecordAccumulator缓冲区大小 32M
properties.put("buffer.memory",33554432);
//key,value序列化类
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//添加分区器
properties.put("partitioner.class","com.wn.Test01.MyPartitioner");
//创建生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
//发送数据
for (int i=0;i<10;i++){
producer.send(new ProducerRecord<String, String>("aaa", Integer.toString(i), "word-" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (recordMetadata!=null){
System.out.println(recordMetadata.partition()+"---"+recordMetadata.offset());
}
}
});
}
//关闭资源
producer.close();
}
}
2.4.3 启动zookeeper+kafka
2.4.4 创建消费者
bin/kafka-console-consumer.sh --zookeeper 192.168.138.55:2181,192.168.138.66:2181,192.168.138.77:2181 --topic aaa
2.4.5 执行方法,查看结果
将所有的消息都在1号分区;
4.同步发送API
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack;
由于send方法返回的是一个Future对象,根据Future对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方法即可;
package com.wn.Test01;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/*同步发送生产者*/
public class TongProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建kafka生产者的配置信息
Properties properties = new Properties();
//kafka集群 ProducerConfig
properties.put("bootstrap.servers","192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
//ack应答级别
properties.put("acks","all");
//重试次数
properties.put("retries",3);
//批次大小 16K
properties.put("batch.size",16384);
//等待时间
properties.put("linger.ms",1);
//RecordAccumulator缓冲区大小 32M
properties.put("buffer.memory",33554432);
//key,value序列化类
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//创建生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
//发送数据
for (int i=0;i<5;i++){
producer.send(new ProducerRecord<String, String>("aaa","wnwn--"+i)).get();
}
//关闭资源
producer.close();
}
}