public static void main(String[] args) {
Properties properties = new Properties();
//kafka集群,下面的配置都可以Prodcuer
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.102:9092");
//acks方式
properties.put("acks", "all");
//重试次数
properties.put("reties", 1);
//批次大小,每次发送数据的大小
properties.put("batch.size", 16384);
//等待时间,如果数据迟迟没有达到batch.size的大小,等待的linger.ms之后就发送数据
properties.put("linger.ms", 1);
//RecordAccumulator缓冲区大小
properties.put("buffer.memory", 33554432);
//序列化
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer(properties);
for (int i = 0; i < 10; i++) {
//"first"是topic,后面是value
producer.send(new ProducerRecord("first", Integer.toString(i)));
}
//关闭资源,一定要关闭
producer.close();
}
public class ConfigKafkaProducer {
public static void main(String[] args) {
//TODO 生产者三个属性必须指定(broker地址清单、key和value的序列化器)
Properties properties = new Properties();
properties.put("bootstrap.servers","127.0.0.1:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//TODO 更多发送配置(重要的)
properties.put("acks","1"); //ack 0,1,all
// 一个批次可以使用的内存大小 缺省16384(16k)
properties.put("batch.size",16384);
// 指定了生产者在发送批次前等待更多消息加入批次的时间, 缺省0 50ms
properties.put("linger.ms",0L);
// 控制生产者发送请求最大大小,默认1M (这个参数和Kafka主机的message.max.bytes 参数有关系)
properties.put("max.request.size",1 * 1024 * 1024);
//TODO 更多发送配置(非重要的)
properties.put("buffer.memory",32 * 1024 * 1024L);//生产者内存缓冲区大小
properties.put("retries",0); //重发消息次数
//客户端将等待请求的响应的最大时间 默认30秒
properties.put("request.timeout.ms",30 * 1000);
//最大阻塞时间,超过则抛出异常 缺省60000ms
properties.put("max.block.ms",60*1000);
// 于压缩数据的压缩类型。默认是无压缩 ,none、gzip、snappy
properties.put("compression.type","none");
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
try {
ProducerRecord<String,String> record;
try {
//TODO 发送4条消息
for(int i=0;i<4;i++){
record = new ProducerRecord<String,String>(
BusiConst.HELLO_TOPIC, String.valueOf(i),"hankin");
producer.send(record);
System.out.println(i+",message is sent");
}
} catch (Exception e) {
e.printStackTrace();
}
} finally {
producer.close();
}
}
}
https://blog.csdn.net/m0_37661458/article/details/102640246