zoukankan      html  css  js  c++  java
  • 记录一次kafka解决相同userId顺序消费的问题

    基本思路:在kafka生产者生产消息时,把相同userId的消息落在同一个分区/partition

    	public void sendTopic1(String tpoic, String userId, String message) {
    		Properties props = new Properties();
            //集群地址,多个服务器用","分隔
            props.put("bootstrap.servers", servers);
            //key、value的序列化,此处以字符串为例,使用kafka已有的序列化类
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("request.required.acks", "all");
            //创建生产者
            int partitionNum = 0;
            if (StringUtils.isBlank(userId)) { //之前介绍过 Key 是可以传空值的
                partitionNum = new Random().nextInt(11);   //随机
            } else {
                //取 %
                partitionNum = Math.abs((userId.hashCode()) % 11);
            }
            log.info("发送topic的partition索引:{}", partitionNum);
            Producer<String, String> producer = new KafkaProducer<>(props);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(tpoic, partitionNum, userId, message);
            producer.send(producerRecord);
            producer.close();
    	}
    

      

  • 相关阅读:
    HIVE 2.1.0 安装教程。(数据源mysql)
    Linux基础命令—sleep
    Linux基础命令—echo
    C语言的基本数据类型
    Linux基础命令—rmdir
    Linux基础命令—mkdir
    Linux基础命令—cd
    Linux基础命令—pwd
    Linux周期性执行任务(crontab)
    Linux执行单一时刻定时任务管理操作(at)
  • 原文地址:https://www.cnblogs.com/wzk-0000/p/11769787.html
Copyright © 2011-2022 走看看