Kafka-序列化器与反序列化器的使用(自定义消息类型)
代码如下
Customer
/** * @Author FengZhen * @Date 2020-03-30 22:49 * @Description 自定义序列化器的实体类 */ public class Customer { private int customerID; private String customerName; public Customer(int customerID, String customerName) { this.customerID = customerID; this.customerName = customerName; } public int getCustomerID() { return customerID; } public void setCustomerID(int customerID) { this.customerID = customerID; } public String getCustomerName() { return customerName; } public void setCustomerName(String customerName) { this.customerName = customerName; } }
序列化器
import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer; import java.nio.ByteBuffer; import java.util.Map; /** * @Author FengZhen * @Date 2020-03-30 22:49 * @Description 自定义序列化器:不建议使用,因为如果修改序列化器,就会出现新旧消息不兼容。 * 建议使用已有的序列化器和反序列化器,如JSON、Avro、Thrift或Protobuf */ public class CustomerSerializer implements Serializer<Customer> { @Override public void configure(Map<String, ?> configs, boolean isKey) { //不做任何配置 } /** * Customer对象被序列化成: * 表示customerID的4字节整数 * 表示customerName长度的4字节整数(如果customerName为空,则长度为0) * 表示customerName的N个字节 * @param topic * @param data * @return */ @Override public byte[] serialize(String topic, Customer data) { try { byte[] serializedName; int stringSize; if (null == data){ return null; }else{ if (data.getCustomerName() != ""){ serializedName = data.getCustomerName().getBytes("UTF-8"); stringSize = serializedName.length; }else{ serializedName = new byte[0]; stringSize = 0; } } ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize); buffer.putInt(data.getCustomerID()); buffer.putInt(stringSize); buffer.put(serializedName); return buffer.array(); } catch (Exception e){ throw new SerializationException("Error when serializing Customer to byte[] " + e); } } @Override public void close() { //不需要关闭任何东西 } }
反序列化器
import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Deserializer; import java.nio.ByteBuffer; import java.util.Map; /** * @Author FengZhen * @Date 2020-04-06 15:08 * @Description 自定义反序列化器 */ public class CustomerDeserializer implements Deserializer<Customer> { @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public Customer deserialize(String topic, byte[] data) { int id; int nameSize; String name; try { if (null == data){ return null; } if (data.length < 8){ throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected"); } ByteBuffer buffer = ByteBuffer.wrap(data); id = buffer.getInt(); nameSize = buffer.getInt(); byte[] nameBytes = new byte[nameSize]; buffer.get(nameBytes); name = new String(nameBytes, "UTF-8"); return new Customer(id, name); } catch (Exception e){ throw new SerializationException("Error when serializing Customer to byte[]" + e); } } @Override public void close() { } }
生产者发送消息
import com.chinaventure.kafka.serializer.Customer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; /** * @Author FengZhen * @Date 2020-03-29 12:21 * @Description kafka生产者使用 */ public class KafkaProducerTest { private static Properties kafkaProps = new Properties(); static { kafkaProps.put("bootstrap.servers", "localhost:9092"); kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); } public static void main(String[] args) { udfSerializer(); } /** * 自定义序列化器 */ public static void udfSerializer(){ kafkaProps.put("value.serializer", "com.chinaventure.kafka.serializer.CustomerSerializer"); KafkaProducer<String, Customer> producer = new KafkaProducer(kafkaProps); for (int i = 0; i < 10; i++){ ProducerRecord<String, Customer> record = new ProducerRecord<>("test_udf_serializer",i % 3 == 0 ? "Apple": "Banana"+i,new Customer(i, "我是" + i)); producer.send(record, new DemonProducerCallback()); } while (true){ try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
消费者读取数据
import com.chinaventure.kafka.serializer.Customer; import com.chinaventure.util.ExceptionUtil; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.*; /** * @Author FengZhen * @Date 2020-04-06 11:07 * @Description kafka消费者 */ public class KafkaConsumerTest { private static Properties kafkaProps = new Properties(); static { kafkaProps.put("bootstrap.servers", "localhost:9092"); kafkaProps.put("group.id", "test"); kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); } private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); private static KafkaConsumer<String, String> consumer; public static void main(String[] args) { udfDeserializer(); } /** * 自定义反序列化器 */ public static void udfDeserializer(){ kafkaProps.put("value.deserializer", "com.chinaventure.kafka.serializer.CustomerDeserializer"); KafkaConsumer<String, Customer> consumer = new KafkaConsumer<String, Customer>(kafkaProps); //订阅主题,可传入一个主题列表,也可以是正则表达式,如果有人创建了与正则表达式匹配的新主题,会立即触发一次再均衡,消费者就可以读取新添加的主题。 //如:test.*,订阅test相关的所有主题 consumer.subscribe(Collections.singleton("test_udf_serializer")); System.out.println("==== subscribe success ===="); try { while (true){ //消费者持续对kafka进行轮训,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者。 //传给poll方法的是一个超时时间,用于控制poll()方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞) //如果该参数被设为0,poll会立即返回,否则它会在指定的毫秒数内一直等待broker返回数据 //poll方法返回一个记录列表。每条记录包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。 ConsumerRecords<String, Customer> records = consumer.poll(Duration.ofMillis(100)); System.out.println("==== data get ===="); for (ConsumerRecord<String, Customer> record : records) { System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s", record.topic(), record.partition(), record.offset(), record.key(), record.value())); } } } catch(Exception e){ e.printStackTrace(); } finally { //退出应用前使用close方法关闭消费者。 //网络连接和socket也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不在发送心跳并认定它已死亡,因为那样需要更长的时间,导致政哥群组在一段时间内无法读取消息。 consumer.close(); } } }
生产者打印内容
topic:test_udf_serializer partition:0 offset:0 metaData:test_udf_serializer-0@0 topic:test_udf_serializer partition:0 offset:1 metaData:test_udf_serializer-0@1 topic:test_udf_serializer partition:0 offset:2 metaData:test_udf_serializer-0@2 topic:test_udf_serializer partition:0 offset:3 metaData:test_udf_serializer-0@3 topic:test_udf_serializer partition:0 offset:4 metaData:test_udf_serializer-0@4 topic:test_udf_serializer partition:0 offset:5 metaData:test_udf_serializer-0@5 topic:test_udf_serializer partition:0 offset:6 metaData:test_udf_serializer-0@6 topic:test_udf_serializer partition:0 offset:7 metaData:test_udf_serializer-0@7 topic:test_udf_serializer partition:0 offset:8 metaData:test_udf_serializer-0@8 topic:test_udf_serializer partition:0 offset:9 metaData:test_udf_serializer-0@9
消费者打印内容
topic=test_udf_serializer, partition=0, offset=0, key=Apple, value=com.chinaventure.kafka.serializer.Customer@63798ca7 topic=test_udf_serializer, partition=0, offset=1, key=Banana1, value=com.chinaventure.kafka.serializer.Customer@4612b856 topic=test_udf_serializer, partition=0, offset=2, key=Banana2, value=com.chinaventure.kafka.serializer.Customer@22875539 topic=test_udf_serializer, partition=0, offset=3, key=Apple, value=com.chinaventure.kafka.serializer.Customer@5674e1f2 topic=test_udf_serializer, partition=0, offset=4, key=Banana4, value=com.chinaventure.kafka.serializer.Customer@79c7532f topic=test_udf_serializer, partition=0, offset=5, key=Banana5, value=com.chinaventure.kafka.serializer.Customer@2a448449 topic=test_udf_serializer, partition=0, offset=6, key=Apple, value=com.chinaventure.kafka.serializer.Customer@32f232a5 topic=test_udf_serializer, partition=0, offset=7, key=Banana7, value=com.chinaventure.kafka.serializer.Customer@43f82e78 topic=test_udf_serializer, partition=0, offset=8, key=Banana8, value=com.chinaventure.kafka.serializer.Customer@e54303 topic=test_udf_serializer, partition=0, offset=9, key=Apple, value=com.chinaventure.kafka.serializer.Customer@e8df99a
Done.