zoukankan      html  css  js  c++  java
  • Kafka消费端数据过滤方案

    前言:

    kafka一些常用命令:

    cd /opt/module/kafka

    查看kafka主题:

    ./kafka-topics.sh --list --zookeeper localhost:2181

     查看主题详情

    ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafkaTopic1

    kafka消费端数据过滤方案:

    在生产端不做配置,只管按特定主题生产数据。

    在消费端,对特定主题数据进行分组获取,然后在获取过程中对符合业务条件的数据进行处理,否则跳过,但还是会告诉kafka我已经消费过了。

    示例代码:

    生产端:

     public String hello()
        {
            Properties props = new Properties();
    
            props.put("bootstrap.servers", "192.168.11.101:9092");
            props.put("acks", "all");
            props.put("retries", "5");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            //生产者实例
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
            x++;
            producer.send(new ProducerRecord<String, String>(topic, "key1", "key1->value1:"+x));
            producer.send(new ProducerRecord<String, String>(topic, "key1", "key1->value11:"+x));
            producer.send(new ProducerRecord<String, String>(topic, "key2", "key2->value2:"+x));
            producer.send(new ProducerRecord<String, String>(topic, "key2", "key2->value22:"+x));
            producer.send(new ProducerRecord<String, String>(topic, "key3", "key3->value3:"+x));
            producer.send(new ProducerRecord<String, String>(topic, "key3", "key3->value33:"+x));
    
    
            return "ok";
        }

    消费端:

     @KafkaListeners(value = {@KafkaListener(topics = {"kafkaTopic1"},groupId = "group1")})
        public void aaaa(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
            String processRecord = (String) record.value();
            String x = (String)record.key();
            if(x.equals("key1"))
            {
                System.out.printf("group1独立获取key1:"+processRecord+"\r\n");
    
            }
            acknowledgment.acknowledge();
        }
        @KafkaListeners(value = {@KafkaListener(topics = {"kafkaTopic1"},groupId = "group2")})
        public void bbbb(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
            String processRecord = (String) record.value();
            String x = (String)record.key();
            if(x.equals("key2"))
            {
                System.out.printf("group2独立获取key2:"+processRecord+"\r\n");
    
            }
            acknowledgment.acknowledge();
        }
       @KafkaListeners(value = {@KafkaListener(topics = {"kafkaTopic1"},groupId = "group3")})
        public void ccc(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
            String processRecord = (String) record.value();
            String x = (String)record.key();
            if(x.equals("key3"))
            {
                System.out.printf("group3独立获取key3:"+processRecord+"\r\n");
    
            }
            acknowledgment.acknowledge();
        }

    其中,group1和group2在调试环境下运行,group3打包成jar后,在命令行窗口中运行,在调用生产端产生数据后,消费端的消费情况如下:

    完全符合预期。

    文章出处:www.cnblogs.com/jizhong

    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接。否则保留追究法律责任的权利。

  • 相关阅读:
    JS中!=、==、!==、===的用法和区别
    Jquery判断Checkbox是否选中三种方法
    C# 信号量 学习
    redis学习资料
    Redis常用命令
    MySQL、HBase、ES的对比
    我对依赖注入,控制反转的理解
    net输出错误日志
    XmlExtensions帮助类
    DatetimeHelper类的编写
  • 原文地址:https://www.cnblogs.com/jizhong/p/15637819.html
Copyright © 2011-2022 走看看