zoukankan      html  css  js  c++  java
  • kafka8 编写简单消费者

    1.eclipse运行消费者代码。代码如下

     1 package cn.test.mykafka;
     2 
     3 import java.util.Arrays;
     4 import java.util.Properties;
     5 
     6 import org.apache.kafka.clients.consumer.ConsumerRecord;
     7 import org.apache.kafka.clients.consumer.ConsumerRecords;
     8 import org.apache.kafka.clients.consumer.KafkaConsumer;
     9 
    10 
    11 /**
    12  * 简单消费者
    13  *
    14  */
    15 
    16 public class SimpleConsumer {
    17 
    18     public static void main(String[] args) {
    19         
    20         Properties props = new Properties();
    21         props.put("bootstrap.servers", "192.168.42.133:9092");
    22         props.put("group.id", "group1");
    23         props.put("enable.auto.commit", "true");
    24         props.put("auto.commit.interval.ms", "1000");
    25         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    26         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    27         
    28         @SuppressWarnings("resource")
    29         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    30         consumer.subscribe(Arrays.asList("test-topic")); //订阅主题
    31         
    32         while (true) {
    33             @SuppressWarnings("deprecation")
    34             ConsumerRecords<String, String> records = consumer.poll(100);
    35             for (ConsumerRecord<String, String> record : records)
    36                 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    37         }
    38     }
    39 }
    SimpleConsumer.java

    2.在服务器生产消息

    [root@hadoop kafka]# kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
    >>test
    >hello
    >world

    3.在eclipse控制台查看输出

    offset = 118, key = null, value = test
    offset = 119, key = null, value = hello
    offset = 120, key = null, value = world

    如上输出表示消费者成功消费消息。

    offset偏移量:每消费一条消息,偏移量+1。

    消费者消费的偏移量记载的是消费者组消费的该分区的消息的个数。

    具体实现步骤参考 kafka5 编写简单生产者

  • 相关阅读:
    51nod 1174 区间最大值(RMQ and 线段树)
    Round #447(Div 2)
    51nod 2006 飞行员匹配
    75.Java异常处理机制throws
    74.Java异常处理机制
    emmm
    数据库关系代数
    汇编实验二 2进制转16进制
    汇编实验一 显示字符串
    JustOj 1386: 众数的数量
  • 原文地址:https://www.cnblogs.com/zhengna/p/9952289.html
Copyright © 2011-2022 走看看