zoukankan      html  css  js  c++  java
  • 【kafka】生产者API

    自动提交

    自动提交:每次消费之后,提交自己的offset

    
    public class MyConsumer {
        public static void main(String[] args) {
            /**
             * 创建消费者的配置信息,并put配置
             * 1.集群ip端口
             * 2.开启自动提交
             * 3. 自动提交延迟
             * 4. key,value序列化类
             * 5. 消费者组id
             */
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("group.id", "c2");
            /**
             * 创建消费者---KafkaConsumer
             * 订阅主题---subscribe---需要传入一个集合
             * 订阅指定Topic+分区---assign
             * 拉取消息---poll---需要参数timeout,没消息时的等待时间
             */
            KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(props);
            // 仅订阅分区
            // kafkaConsumer.subscribe(Collections.singleton("test"));
            TopicPartition test = new TopicPartition("test", 0);
            kafkaConsumer.assign(Collections.singleton(test));
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    String topic = record.topic();
                    String key = record.key();
                    long offset = record.offset();
                    int partition = record.partition();
                    System.out.println(
                            "Topic"+"==>"+topic+" partition:"+"==>" + partition + "key:"+ "==>" + key + " offset:" +"==>" + offset);
                }
            }
        }
    }
    
    

    手动提交

    两种提交方式:同步提交commitSync,异步提交commitAsync

    public class ManualCommit {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("enable.auto.commit", "false");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("group.id", "c2");
    
            KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(props);
            TopicPartition test = new TopicPartition("test", 0);
            kafkaConsumer.assign(Collections.singleton(test));
    
            /**
             * 只能消费实时数据
             * 要想实现--from-beginning
             * 需要重置offset
             */
            while (true) {
                ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    String topic = record.topic();
                    String key = record.key();
                    long offset = record.offset();
                    int partition = record.partition();
                    System.out.println(
                            "Topic"+"==>"+topic+" partition:"+"==>" + partition + "key:"+ "==>" + key + " offset:" +"==>" + offset);
                }
                /**
                 * 同步提交(commitSync):阻塞,提交成功,才会下一次数据拉取
                 * 异步提交(commitAsync):提交线程和poll线程,异步
                 */
                kafkaConsumer.commitAsync();
            }
        }
    }
    
    
    
  • 相关阅读:
    PHP chr函数 对应的AscII码
    微信小程序:样式,事件
    PHP 实现移动端极光推送(转)
    微信小程序服务器请求和上传数据,上传图片并展示,提交表单完整实例代码附效果图(转)
    大头照上传预览,并操作数据库和删除文件夹中存储的之前的图片;$_SERVER['DOCUMENT_ROOT']上传图片和删除图片的时候不要用绝对路径,可以用这个路径
    thinkphp 条件搜索分页(tp自带Page类)
    asp搭配环境
    html5手机端手指滑动选项卡滚动切换效果(转)
    tp框架实现ajax注册验证
    tp框架链接数据库的基本操作
  • 原文地址:https://www.cnblogs.com/mussessein/p/12187728.html
Copyright © 2011-2022 走看看