基本思路:在kafka生产者生产消息时,把相同userId的消息落在同一个分区/partition
public void sendTopic1(String tpoic, String userId, String message) { Properties props = new Properties(); //集群地址,多个服务器用","分隔 props.put("bootstrap.servers", servers); //key、value的序列化,此处以字符串为例,使用kafka已有的序列化类 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("request.required.acks", "all"); //创建生产者 int partitionNum = 0; if (StringUtils.isBlank(userId)) { //之前介绍过 Key 是可以传空值的 partitionNum = new Random().nextInt(11); //随机 } else { //取 % partitionNum = Math.abs((userId.hashCode()) % 11); } log.info("发送topic的partition索引:{}", partitionNum); Producer<String, String> producer = new KafkaProducer<>(props); ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(tpoic, partitionNum, userId, message); producer.send(producerRecord); producer.close(); }