1. 定义要发送的消息User POJO
package lenmom.kafkaproducer; public class User { public String name; public int age; public String address; public User(String name,int age,String address){ this.name=name; this.age=age; this.address=address; } @Override public String toString() { return "user:"+name +",age:"+age+",address:"+address; } }
2. 定义序列化User POJO序列化器
package lenmom.kafkaproducer; import org.apache.kafka.common.serialization.Serializer; import org.codehaus.jackson.map.ObjectMapper; import java.util.Map; public class UserSerializer implements Serializer { private ObjectMapper objectMapper; public void configure(Map map, boolean b) { this.objectMapper = new ObjectMapper(); } public byte[] serialize(String topic, Object object) { byte[] result = null; try { result = objectMapper.writeValueAsString(object).getBytes("utf-8"); } catch (Exception e) { e.printStackTrace(); } return result; } public void close() { } }
此序列化器使用了org.codehaus.jackson.jackson-mapper-asl.jar来进行序列化为jason
3. 定义partitioner
package lenmom.kafkaproducer; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import java.util.List; import java.util.Map; import java.util.Random; public class AgePartitioner implements Partitioner { private Random random; public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { System.out.println("key:"+keyObj.toString()+",value: "+value.toString()); Integer age = ((User)value).age; List<PartitionInfo> partitions = cluster.availablePartitionsForTopic(topic); int partitionCount = ((List) partitions).size(); int auditPartition = partitionCount - 1; // 如果年龄大于20岁的,分发到kafka的最后一个分区,否则随机发送到前面几个分区中去 return age< 20 ? random.nextInt(partitionCount - 1) : auditPartition; } public void close() { } public void configure(Map<String, ?> map) { this.random = new Random(); } }
4. 定义发送的消息统计拦截器
package lenmom.kafkaproducer; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class CounterIntecepter implements ProducerInterceptor { int count = 0; int failedCount = 0; /* * This method will be called before the message send to the broker * */ public ProducerRecord onSend(ProducerRecord producerRecord) { count += 1; System.out.println("preparing to send value: " + producerRecord.value().toString()); return producerRecord; } /* * This method would be called when the message was handled by the borker after the broker call back. */ public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { if (e != null) { failedCount++; } } public void close() { System.out.println("total: " + this.count + ",failed: " + this.failedCount); } public void configure(Map<String, ?> map) { } }
5. Producer主类
package lenmom.kafkaproducer;
import org.apache.kafka.clients.producer.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class ProducerDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.131:9092,192.168.1.193:9092,192.168.1.194:9092");
props.put("acks", "all"); //
props.put("retries", Integer.MAX_VALUE);
props.put("batch.size", 323840);
props.put("linger.ms", 100);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put("max.in.flight.requests.per.connection",1);
testCustomSerializerProducer(props);
}
private static void testCustomSerializerProducer(Properties props) throws ExecutionException, InterruptedException {
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "lenmom.kafkaproducer.UserSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//设置消息拦截器
List<String> interceptors= new ArrayList<String>();
interceptors.add("lenmom.kafkaproducer.CounterIntecepter");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);
//设置自定义分区
props.put("partitioner.class", "lenmom.kafkaproducer.AgePartitioner");
final Producer<String, User> producer = new KafkaProducer<String, User>(props);
for (int i = 0; i < 100; i++) {
User user = new User("lenmom" + i, i, "address" + i);
ProducerRecord<String, User> record= new ProducerRecord<String, User>("lenmom", user);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
producer.close(0,TimeUnit.MILLISECONDS);
} else {
System.out.println("send success");
}
}
});
}
producer.close();
}
}
注意: KafkaProducer是线程安全的,也就是说可以在多个线程中共享一个KafkaProducer实例.
6. pom.xml文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>lenmom</groupId> <artifactId>kafkaproducer</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.0</version> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> <version>1.9.13</version> </dependency> </dependencies> </project>
运行效果:
Producer执行效果
启动consumer:
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.101:9092 --topic lenmom --from-beginning
接收到的消息:
7. 配置参数说明
7.1 消息无丢失配置
7.1.1 Producer配置
7.1.1.1
#使得内存缓冲区被填满时producer 处于阻塞状态并停止接收新的消息而不是抛出异常;否则producer 生产速度过快会耗尽缓冲区
block.on.buffer.full =true
7.1.1.2
#设置acks 为all 很容易理解,即必须要等到所有fo llower 都响应了发送消息才能认为提交成功,这是pro ducer 端最强程度的持久化保证。
acks = all or 1
7.1.1.3
#重试次数设置为Integer.MAX_VALUE,设置成MAX_VALUE 纵然有些极端,但其实想表达的是producer 要开启无限重试。
#用户不必担心pro ducer 会重试那些肯定无法恢复的错误,当前producer 只会重试那些可恢复的异常
#情况,所以放心地设置一个比较大的值通常能很好地保证消息不丢失。
retries = Integer.MAX_VALUE
7.1.1.4
#设置该参数为1 主要是为工防止topic 同分区下的消息乱序问题。这个参数的实际效果其
#实限制了producer 在单个broker 连接上能够发送的未响应请求的数量。因此,如果设置成l ,
#则producer 在某个broker 发送响应之前将无法再给该broker 发送PRODUCE 请求
max.in.flight.requests.per.connection= 1
7.1.1.5
使用带有回调机制的send
不要使用KafkaPro ducer 中单参数的send 方法,因为该se nd 调用仅仅是把消息发出而不会理会消息发送的结果。如果消息发送失败,该方法不会得到任何通知,故可能造成数据的丢失。
实际环境中一定要使用带回调机制的send 版本,即KafkaProducer.send(record, callback) 。
7.1.1.6
Callback 逻辑中显式立即关闭producer
在Callback 的失败处理逻辑中显式调用KafkaProducer .close(O ) 。这样做的目的是为了处理消息的乱序问题。
若不使用close(O),默认情况下producer 会被允许将未完成的消息发送出去,这样就有可能造成消息乱序。
7.1.2 Broker消息无丢失设置
7.1.2.1
#broker关闭unclean leader 选举,即不允许非ISR 中的副本被选举为leader ,从而避免broker 端因日志水位截断而造成的消息丢失。
unclean.leader.election .enable= false
7.1.2.2
#broker 设置成3 主要是参考了Hadoop 及业界通用的三备份原则,其实这里想强调的是一定要使用多个副本来保存分区的消息。
replication.factor = 3
7.1.2.3
#broker 用于控制某条消息至少被写入到ISR 中的多少个副本才算成功,设置成大于1 是为了提升producer 端发送语义的持久性。
#记住只有在producer 端acks 被设置成all 或一l 时,这个参数才有意义。在实际使用时,不要使用默认值。
min.insync .replicas = 2
7.1.2.4
#btoker 若两者相等,那么只要有一个副本挂掉,分区就无法正常工作,虽然有很高的持久性但可用性被极大地降低了。推荐配置成replication.factor= min.insyn.replicas + l
replication.factor > min.insync.replicas
7.1.2.5
#broker 禁用自动提交
enable.auto.commit= false
7.2 压缩参数设置
producer使用压缩算法,可以提高kafka的吞吐量,目前支持的压缩算法有GZIP 、Snappy、LZ4和Facebook 公司于2016 年8 月底开源了新的压缩算法Zstandard。
配置方法为,在给producer的配置构造函数中加入对应的配置项,如:
props . put (” compressiont.typ”,” snappy");
//或者
props.put(ProducerConfig . COMPRESSION TYPE CONFIG ,”snappy”);
使用压缩算法,对集群的CPU有要求,建议的生产集群配置为:
linux
CPU 24 核
内存32GB 。
磁盘lTB 7200 转SAS 盘两块
带宽lGb/s
ulimit -n 1000000 。
Socket Buffer 至少64KB一一适用于跨机房网络传输。