zoukankan      html  css  js  c++  java
  • kafka8 编写简单消费者

    1.eclipse运行消费者代码。代码如下

     1 package cn.test.mykafka;
     2 
     3 import java.util.Arrays;
     4 import java.util.Properties;
     5 
     6 import org.apache.kafka.clients.consumer.ConsumerRecord;
     7 import org.apache.kafka.clients.consumer.ConsumerRecords;
     8 import org.apache.kafka.clients.consumer.KafkaConsumer;
     9 
    10 
    11 /**
    12  * 简单消费者
    13  *
    14  */
    15 
    16 public class SimpleConsumer {
    17 
    18     public static void main(String[] args) {
    19         
    20         Properties props = new Properties();
    21         props.put("bootstrap.servers", "192.168.42.133:9092");
    22         props.put("group.id", "group1");
    23         props.put("enable.auto.commit", "true");
    24         props.put("auto.commit.interval.ms", "1000");
    25         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    26         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    27         
    28         @SuppressWarnings("resource")
    29         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    30         consumer.subscribe(Arrays.asList("test-topic")); //订阅主题
    31         
    32         while (true) {
    33             @SuppressWarnings("deprecation")
    34             ConsumerRecords<String, String> records = consumer.poll(100);
    35             for (ConsumerRecord<String, String> record : records)
    36                 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    37         }
    38     }
    39 }
    SimpleConsumer.java

    2.在服务器生产消息

    [root@hadoop kafka]# kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
    >>test
    >hello
    >world

    3.在eclipse控制台查看输出

    offset = 118, key = null, value = test
    offset = 119, key = null, value = hello
    offset = 120, key = null, value = world

    如上输出表示消费者成功消费消息。

    offset偏移量:每消费一条消息,偏移量+1。

    消费者消费的偏移量记载的是消费者组消费的该分区的消息的个数。

    具体实现步骤参考 kafka5 编写简单生产者

  • 相关阅读:
    jquery实现奇偶行赋值不同css值
    Android短信批量插入速度优化的思考与尝试
    Android短信列表的时间显示
    短信优先级及有效期
    模拟器收短信和接电话的方法
    Android:Perferences的使用
    留个脚印
    Android电池电量更新 BatteryService(转)
    Android号码匹配位数修改
    CDMA SMS pdu解码
  • 原文地址:https://www.cnblogs.com/zhengna/p/9952289.html
Copyright © 2011-2022 走看看