zoukankan      html  css  js  c++  java
  • java kafka 生产者消费者 高级API

    Java中提供高级的API,相对于低级API(更小的粒度控制消费)使用起来非常方便。

     pom:

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>1.0.0</version>
            </dependency>

    一、修改kafka   server.porperties的ip是你kafka服务的ip

    listeners=PLAINTEXT://192.168.111.130:9092

    二、生产者的例子

    import org.apache.kafka.clients.producer.*;
    
    import java.util.Properties;
    
    public class KafkaProducerDemo {
        private final Producer<String, String> kafkaProdcer;
        public final static String TOPIC = "JAVA_TOPIC";
    
        private KafkaProducerDemo() {
            kafkaProdcer = createKafkaProducer();
        }
    
        private Producer<String, String> createKafkaProducer() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.111.130:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            Producer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
            return kafkaProducer;
        }
    
        void produce() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                final String key = "key" + i;
                String data = "hello kafka message:" + key;
                kafkaProdcer.send(new ProducerRecord<String, String>(TOPIC, key, data), new Callback() {
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        System.out.println("发送key" + key + "成功");
                    }
                });
            }
        }
    
        public static void main(String[] args) {
            KafkaProducerDemo kafkaProducerDemo = new KafkaProducerDemo();
            kafkaProducerDemo.produce();
        }
    
    }

    用properties构造一个Producer的实例,然后调用send方法,传入数据,还有一个回调函数。

    可以看到数据已经进来了。

    注意:kafka producer支持同步发送、异步发送、异步发送+回调函数方式。

    1、同步方式会按顺序发送,打印出来的结果是按发送的顺序:

    for (int i = 0; i < 1000; i++) {
        RecordMetadata test = producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hello world-" + i)).get();
        System.out.println(test);
    }

    2、回调函数里面可以对成功或者失败,分支判断,进行业务上的进一步处理。甚至可以把失败的消息存储下来。

    for (int i = 0; i < 10; i++) {
        producer.send(new ProducerRecord<String, String>("test", i + "", "xxx-" + i), new Callback() {
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e != null) {
                    e.printStackTrace();
                } else {
                    System.out.println("发送成功");
                }
            }
        });
    }

    注:回调函数里面onCompletion方法其实是阻塞的! 如果进行延时,会逐个执行,不会同时并发跑,但是发送数据任然是异步的。

    三、消费者例子

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    public class KafkaConsumerDemo {
        private final KafkaConsumer<String, String> consumer;
        private KafkaConsumerDemo(){
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.111.130:9092");
            props.put("group.id", "test");
            props.put("enable.auto.commit", "false");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumer = new KafkaConsumer<String, String>(props);
        }
        void consume(){
            consumer.subscribe(Arrays.asList(KafkaProducerDemo.TOPIC));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records){
                    System.out.println("I'm coming");
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        }
        public static void main(String[] args) {
            KafkaConsumerDemo kafkaConsumerDemo = new KafkaConsumerDemo();
            kafkaConsumerDemo.consume();
        }
    }

    正常启动是看不到东西的, 两个同时启动才有。消费者只看接下来有哪些生产者发来新的消息。

    props.put("enable.auto.commit", "true");

    这个的意思是,消费后自动改变偏移量。如果不添加这个,就会在服务器存的offset开始消费,并且不会改变offset的值。

    如果为false, 可以看到不管消费几次,服务端存储的始终是offset的值都不会改变,需要手动提交offset。

    如果想让consumer从头开始消费,可以设置:

    props.put("auto.offset.reset", "earliest");

    这个只对新建的组有效,如果一个组已经消费过,offset的值已经存在服务端了,这样设置不起作用的,只会从服务端存储的offset开始消费。不设置默认是latest,就是从最新的开始消费。

  • 相关阅读:
    心跳机制
    C++虚继承和虚基类
    STL,ATL与WTL
    C# Task的暂停与终止
    C#继承
    C#线程同步问题
    CourtAi发布配置文件修改说明
    阿里云虚拟主机https化步骤第一篇,申请证书(笔记)
    linux 服务器重启指令
    .net core 发布到iis问题 HTTP Error 500.30
  • 原文地址:https://www.cnblogs.com/chenmz1995/p/10425802.html
Copyright © 2011-2022 走看看