zoukankan      html  css  js  c++  java
  • kafka相关

    https://www.jianshu.com/p/abbc09ed6703

    https://blog.csdn.net/wangshuminjava/article/details/80238161?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_baidulandingword-4&spm=1001.2101.3001.4242

    创建topic和指定分区数
    
        @GetMapping("/createTopic")
        public CreateTopicsResult createTopic(){
            NewTopic newTopic = new NewTopic("createTopic", 3, (short) 1);
            CreateTopicsResult result = kafkaAdminClient.createTopics(Arrays.asList(newTopic));
            return result;
        }
    发送消息
    
        @Resource
        private KafkaTemplate<String, String> kafkaTemplate;
        @Resource
        private KafkaAdminClient kafkaAdminClient;
    
    
    
        @RequestMapping("/sendMsg")
        public String sendMsg(@RequestParam String msg){
            try {
                kafkaTemplate.send("createTopic",msg);
                return "发送消息成功";
            } catch (Exception e) {
                e.printStackTrace();
                return "发送消息失败";
            }
        }
        消费消息注解版
    
    @KafkaListener(topics = "jikeyiTest" )
        public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) {
            System.out.printf("topic is %s, offset is %d,partition is %s, value is %s 
    ", record.topic(), record.offset(),record.partition(), record.value());
            ack.acknowledge();
        }
    消费消息复杂版,指定分区
    
        public static void startConsume(){
            Properties props = new Properties();
            // 必须设置的属性
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("group.id", "yasuo");
    
            Consumer consumer = new KafkaConsumer(props);
            TopicPartition topicPartition = new TopicPartition("createTopic",1);
            consumer.assign(Arrays.asList(topicPartition));
    
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                records.forEach(record -> {
                    System.out.printf("topic is %s, offset is %d,partition is %s, value is %s 
    ", record.topic(), record.offset(),record.partition(), record.value());
                });
            }
        }
    配置文件
    
    
    spring:
      kafka:
        bootstrap-servers: localhost:9092
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          group-id: yasuo
          enable-auto-commit: false
          auto-commit-interval: 1000
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          auto-offset-reset: earliest
          max-poll-records: 1
        listener:
          ack-mode: manual_immediate
  • 相关阅读:
    codeforces 269B Greenhouse Effect
    codeforces 5C Longest Regular Bracket Sequence
    codeforces 225C Barcode
    codeforces 279C Ladder
    CodeForces 479E Riding in a Lift
    CodeForces 351A Jeff and Rounding
    POJ-1579-201308122106.txt
    表达式求值-201308081712.txt
    Encoding-201308072147.txt
    A + B Problem II201308072001.txt
  • 原文地址:https://www.cnblogs.com/jikeyi/p/14617212.html
Copyright © 2011-2022 走看看