zoukankan      html  css  js  c++  java
  • kafka相关

    https://www.jianshu.com/p/abbc09ed6703

    https://blog.csdn.net/wangshuminjava/article/details/80238161?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_baidulandingword-4&spm=1001.2101.3001.4242

    创建topic和指定分区数
    
        @GetMapping("/createTopic")
        public CreateTopicsResult createTopic(){
            NewTopic newTopic = new NewTopic("createTopic", 3, (short) 1);
            CreateTopicsResult result = kafkaAdminClient.createTopics(Arrays.asList(newTopic));
            return result;
        }
    发送消息
    
        @Resource
        private KafkaTemplate<String, String> kafkaTemplate;
        @Resource
        private KafkaAdminClient kafkaAdminClient;
    
    
    
        @RequestMapping("/sendMsg")
        public String sendMsg(@RequestParam String msg){
            try {
                kafkaTemplate.send("createTopic",msg);
                return "发送消息成功";
            } catch (Exception e) {
                e.printStackTrace();
                return "发送消息失败";
            }
        }
        消费消息注解版
    
    @KafkaListener(topics = "jikeyiTest" )
        public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) {
            System.out.printf("topic is %s, offset is %d,partition is %s, value is %s 
    ", record.topic(), record.offset(),record.partition(), record.value());
            ack.acknowledge();
        }
    消费消息复杂版,指定分区
    
        public static void startConsume(){
            Properties props = new Properties();
            // 必须设置的属性
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("group.id", "yasuo");
    
            Consumer consumer = new KafkaConsumer(props);
            TopicPartition topicPartition = new TopicPartition("createTopic",1);
            consumer.assign(Arrays.asList(topicPartition));
    
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                records.forEach(record -> {
                    System.out.printf("topic is %s, offset is %d,partition is %s, value is %s 
    ", record.topic(), record.offset(),record.partition(), record.value());
                });
            }
        }
    配置文件
    
    
    spring:
      kafka:
        bootstrap-servers: localhost:9092
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          group-id: yasuo
          enable-auto-commit: false
          auto-commit-interval: 1000
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          auto-offset-reset: earliest
          max-poll-records: 1
        listener:
          ack-mode: manual_immediate
  • 相关阅读:
    Azure 中 Linux 虚拟机的大小
    排查在 Azure 中创建、重启 Linux VM 或调整其大小时发生的分配故障
    如何在 Azure 中的 Linux 经典虚拟机上设置终结点
    针对通过 SSH 连接到 Azure Linux VM 时发生的失败、错误或被拒绝问题进行故障排除
    Linux 内核超时导致虚拟机无法正常启动
    Java并发编程(十三)同步容器类
    可以开发着玩一下的web项目
    org.tmatesoft.svn.core.SVNCancelException: svn: E200015: authentication canc
    FastDFS单机搭建以及java客户端Demo
    做前端(单纯页面和js)遇到的问题辑录(一)
  • 原文地址:https://www.cnblogs.com/jikeyi/p/14617212.html
Copyright © 2011-2022 走看看