zoukankan      html  css  js  c++  java
  • 如何创建Kafka客户端:Avro Producer和Consumer Client

    1.目标 - Kafka客户端

    在本文的Kafka客户端中,我们将学习如何使用Kafka API 创建Apache Kafka客户端。有几种方法可以创建Kafka客户端,例如最多一次,至少一次,以及一次性消息处理需求。因此,在这个Kafka客户端教程中,我们将学习所有三种方式的详细描述。此外,我们将详细介绍如何使用Avro客户端。

    那么,让我们开始Kafka客户端教程。

    卡夫卡客户

    如何创建Kafka客户端:Avro Producer和Consumer Client

    2. Kafka客户是什么?

    • 创建Kafka客户端的先决条件
    1. 最初,为了创建Kafka客户端,我们必须在本地计算机上设置Apache Kafka中间件。 
    2. 此外,在开始创建Kafka客户端之前,本地安装的单个节点Kafka实例必须与我们的本地机器一起运行,并且需要运行Zookeeper和arning Kafka节点。

    学习Apache Kafka用例| Kafka应用程序
    此外,在Kafka客户端中创建一个名为normal-topic的主题,其中包含两个分区,命令为:

    bin/kafka-topics --zookeeper localhost:2181 --create --topic normal-topic --partitions 2 --replication-factor 1
    1. bin / kafka-topics --zookeeper localhost:2181 --create --topic normal-topic --partitions 2 --rerelication -factor 1

    此外,执行以下命令,以检查创建的主题的状态:

    bin/kafka-topics --list --topic normal-topic --zookeeper localhost:2181
    1. bin / kafka-topics --list --topic normal-topic --zookeeper localhost:2181

    此外,要在需要更改主题时增加分区,请执行以下命令:

    bin/kafka-topics.sh --alter --topic normal-topic --zookeeper localhost:2181 --partitions 2
    1. bin / kafka-topics.sh --alter --topic normal-topic --zookeeper localhost:2181 --partitions 2

    3.卡夫卡制片人客户

    这里是以下代码来实现Kafka生产者客户端。它将有助于发送文本消息并调整循环以控制需要发送以创建Kafka客户端的消息数量:

    public class ProducerExample {
       public static void main(String[] str) throws InterruptedException, IOException {
               System.out.println("Starting ProducerExample ...");
               sendMessages();
       }
       private static void sendMessages() throws InterruptedException, IOException {
               Producer<String, String> producer = createProducer();
               sendMessages(producer);
               // Allow the producer to complete sending of the messages before program exit.
               Thread.sleep(20);
       }
       private static Producer<String, String> createProducer() {
           Properties props = new Properties();
           props.put("bootstrap.servers", "localhost:9092");
           props.put("acks", "all");
           props.put("retries", 0);
           // Controls how much bytes sender would wait to batch up before publishing to Kafka.
           props.put("batch.size", 10);
           props.put("linger.ms", 1);
           props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
           props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
           return new KafkaProducer(props);
       }
       private static void sendMessages(Producer<String, String> producer) {
           String topic = "normal-topic";
           int partition = 0;
           long record = 1;
           for (int i = 1; i <= 10; i++) {
               producer.send(
                   new ProducerRecord<String, String>(topic, partition,                                 Long.toString(record),Long.toString(record++)));
           }
       }
    }

    4.消费者可以注册Kafka

    首先,让我们学习几种方法,Kafka消费者客户可以通过这种方式向Kafka经纪人注册具体来说,有两种方法,使用subscribe方法调用或使用assign方法调用。让我们详细了解这两种Kafka客户端方法。

    一个。使用订阅方法调用

    使用订阅方法调用时,Kafka会在添加/删除主题或分区时,或者在添加或删除使用者时自动重新平衡可用的使用者。

    湾 使用分配方法调用。

    但是,当消费者使用assign方法调用注册时,Kafka客户端不提供消费者的自动重新平衡。
    让我们修改Kafka架构及其基本概念
    上述任何一种注册选项都可以被最多一次,至少一次或完全一次的消费者使用。
    一世。最多一次卡夫卡消费者(零次或多次交付)
    基本上,这是卡夫卡消费者的默认行为
    要在Kafka客户端中配置此类型的使用者,请按照下列步骤操作:

    • 首先,将'enable.auto.commit'设置为true。
    • 另外,将'auto.commit.interval.ms'设置为较低的时间范围。
    • 确保不要调用consumer.commitSync(); 来自消费者。此外,Kafka将使用此消费者配置以指定的时间间隔自动提交偏移量。

    然而,消费者有可能表现出最多一次或至少一次的行为,而消费者则以这种方式配置。虽然,让我们将此消费者声明为最多一次,因为最多一次是较低的消息传递保证。让我们详细讨论两种消费者行为:

    • 最多一次的情景

    发生提交间隔的时刻,以及触发Kafka自动提交上次使用的偏移的时刻,这种情况发生。但是,让我们假设消息和消费者在处理之间崩溃了。然后,当消费者重新启动时,它开始从最后提交的偏移量接收消息。同时,消费者可能会丢失一些消息。
    探索卡夫卡的优势与劣势

    • 至少一次的情况

    当消费者处理消息并将消息提交到其持久存储中时,消费者在此时崩溃,这种情况发生。但是,让我们假设Kafka没有机会向代理提交偏移,因为提交间隔还没有通过。然后,当消费者重新启动时,它会从最后一个提交的偏移量中获得一些较旧的消息。
    卡夫卡消费者代码:

    public class AtMostOnceConsumer {
           public static void main(String[] str) throws InterruptedException {
               System.out.println("Starting  AtMostOnceConsumer ...");
               execute();
           }
           private static void execute() throws InterruptedException {
                   KafkaConsumer<String, String> consumer = createConsumer();
                   // Subscribe to all partition in that topic. 'assign' could be used here
                   // instead of 'subscribe' to subscribe to specific partition.
                   consumer.subscribe(Arrays.asList("normal-topic"));
                   processRecords(consumer);
           }
           private static KafkaConsumer<String, String> createConsumer() {
                   Properties props = new Properties();
                   props.put("bootstrap.servers", "localhost:9092");
                   String consumeGroup = "cg1";
                   props.put("group.id", consumeGroup);
                   // Set this property, if auto commit should happen.
                   props.put("enable.auto.commit", "true");
                   // Auto commit interval, kafka would commit offset at this interval.
                   props.put("auto.commit.interval.ms", "101");
                   // This is how to control number of records being read in each poll
                   props.put("max.partition.fetch.bytes", "135");
                   // Set this if you want to always read from beginning.
                   // props.put("auto.offset.reset", "earliest");
                   props.put("heartbeat.interval.ms", "3000");
                   props.put("session.timeout.ms", "6001");
                   props.put("key.deserializer",
                           "org.apache.kafka.common.serialization.StringDeserializer");
                   props.put("value.deserializer",
                           "org.apache.kafka.common.serialization.StringDeserializer");
                   return new KafkaConsumer<String, String>(props);
           }
           private static void processRecords(KafkaConsumer<String, String> consumer)  {
                   while (true) {
                           ConsumerRecords<String, String> records = consumer.poll(100);
                           long lastOffset = 0;
                           for (ConsumerRecord<String, String> record : records) {
                                   System.out.printf("
    
    offset = %d, key = %s, value = %s", record.offset(),                                             record.key(), record.value());
                                  lastOffset = record.offset();
                            }
                   System.out.println("lastOffset read: " + lastOffset);
                   process();
                   }
           }
           private static void process() throws InterruptedException {
                   // create some delay to simulate processing of the message.
                   Thread.sleep(20);
           }
    }

    II。至少一次Kafka Consumer(一个或多个消息传递,可能重复)
    为了配置此类型的使用者,请按照下列步骤操作:

    • 首先,将'enable.auto.commit'设置为false或
    • 另外,将'enable.auto.commit'设置为true,将'auto.commit.interval.ms'设置为更高的数字。

    通过调用consumer.commitSync(),Consumer现在应该控制消息偏移提交给Kafka; 
    此外,为了避免重复消息的重新处理,在消费者中实现“幂等”行为,尤其是对于这种类型的消费者,因为在以下场景中,可能发生重复的消息传递。
    我们来讨论Apache Kafka Security | Kafka代码的需求和组成部分

    public class AtLeastOnceConsumer {
       public static void main(String[] str) throws InterruptedException {
               System.out.println("Starting AutoOffsetGuranteedAtLeastOnceConsumer ...");
               execute();
        }
       private static void execute() throws InterruptedException {
               KafkaConsumer<String, String> consumer = createConsumer();
               // Subscribe to all partition in that topic. 'assign' could be used here
               // instead of 'subscribe' to subscribe to specific partition.
               consumer.subscribe(Arrays.asList("normal-topic"));
               processRecords(consumer);
        }
        private static KafkaConsumer<String, String> createConsumer() {
               Properties props = new Properties();
               props.put("bootstrap.servers", "localhost:9092");
               String consumeGroup = "cg1";
               props.put("group.id", consumeGroup);
               // Set this property, if auto commit should happen.
               props.put("enable.auto.commit", "true");
               // Make Auto commit interval to a big number so that auto commit does not happen,
               // we are going to control the offset commit via consumer.commitSync(); after processing             // message.
               props.put("auto.commit.interval.ms", "999999999999");
               // This is how to control number of messages being read in each poll
               props.put("max.partition.fetch.bytes", "135");
               props.put("heartbeat.interval.ms", "3000");
               props.put("session.timeout.ms", "6001");
               props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
               props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
               return new KafkaConsumer<String, String>(props);
       }
        private static void processRecords(KafkaConsumer<String, String> consumer) throws {
               while (true) {
                       ConsumerRecords<String, String> records = consumer.poll(100);
                       long lastOffset = 0;
                       for (ConsumerRecord<String, String> record : records) {
                           System.out.printf("
    
    offset = %d, key = %s, value = %s", record.offset(),                                         record.key(), record.value());
                           lastOffset = record.offset();
                       }
                       System.out.println("lastOffset read: " + lastOffset);
                       process();
                       // Below call is important to control the offset commit. Do this call after you
                       // finish processing the business process.
                       consumer.commitSync();
               }
       }
       private static void process() throws InterruptedException {
           // create some delay to simulate processing of the record.
           Thread.sleep(20);
       }
    }

    III。通过订阅(一个且只有一个消息传递)完全一次Kafka动态消费者
    这里,通过'subscribe'(1,a)注册方法调用,消费者向Kafka注册。
    确保在这种情况下应手动管理偏移量。要在Kafka客户端中设置完全一次的方案,请按照下列步骤操作:

    • 首先,设置enable.auto.commit = false。
    • 处理完消息后,请勿调用consumer.commitSync()。
    • 此外,通过进行“订阅”调用,将消费者注册到主题。
    • 要从该主题/分区的特定偏移量开始读取,请实现ConsumerRebalanceListener。此外,在侦听器中执行consumer.seek(topicPartition,offset)。
    • 作为安全网,实施幂等。

    码:

    public class ExactlyOnceDynamicConsumer {
          private static OffsetManager offsetManager = new OffsetManager("storage2");
           public static void main(String[] str) throws InterruptedException {
                   System.out.println("Starting ExactlyOnceDynamicConsumer ...");
                   readMessages();
           }
           private static void readMessages() throws InterruptedException {
                   KafkaConsumer<String, String> consumer = createConsumer();
                   // Manually controlling offset but register consumer to topics to get dynamically
                   // assigned partitions. Inside MyConsumerRebalancerListener use
                   // consumer.seek(topicPartition,offset) to control offset which messages to be read.
                   consumer.subscribe(Arrays.asList("normal-topic"),
                                   new MyConsumerRebalancerListener(consumer));
                   processRecords(consumer);
           }
           private static KafkaConsumer<String, String> createConsumer() {
                   Properties props = new Properties();
                   props.put("bootstrap.servers", "localhost:9092");
                   String consumeGroup = "cg3";
                   props.put("group.id", consumeGroup);
                   // To turn off the auto commit, below is a key setting.
                   props.put("enable.auto.commit", "false");
                   props.put("heartbeat.interval.ms", "2000");
                   props.put("session.timeout.ms", "6001");
                   // Control maximum data on each poll, make sure this value is bigger than the maximum                   // single message size
                   props.put("max.partition.fetch.bytes", "140");
                   props.put("key.deserializer",                                 "org.apache.kafka.common.serialization.StringDeserializer");
                   props.put("value.deserializer",                         "org.apache.kafka.common.serialization.StringDeserializer");
                   return new KafkaConsumer<String, String>(props);
           }
           private static void processRecords(KafkaConsumer<String, String> consumer)
               while (true) {
                       ConsumerRecords<String, String> records = consumer.poll(100);
                       for (ConsumerRecord<String, String> record : records) {
                               System.out.printf("offset = %d, key = %s, value = %s
    ", record.offset(),                                     record.key(), record.value());
                               // Save processed offset in external storage.
                               offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(),                                             record.offset());
                       }
                  }
           }
    }
    public class MyConsumerRebalancerListener implements                                 org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
           private OffsetManager offsetManager = new OffsetManager("storage2");
           private Consumer<String, String> consumer;
           public MyConsumerRebalancerListener(Consumer<String, String> consumer) {
                   this.consumer = consumer;
           }
           public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                   for (TopicPartition partition : partitions) {
                       offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(),                     consumer.position(partition));
                   }
           }
           public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                   for (TopicPartition partition : partitions) {
                           consumer.seek(partition,                             offsetManager.readOffsetFromExternalStore(partition.topic(),                             partition.partition()));
                   }
           }
    }
    /**
    * The partition offset are stored in an external storage. In this case in a local file system where
    * program runs.
    */
    public class OffsetManager {
           private String storagePrefix;
           public OffsetMpublic class ExactlyOnceDynamicConsumer {
          private static OffsetManager offsetManager = new OffsetManager("storage2");
           public static void main(String[] str) throws InterruptedException {
                   System.out.println("Starting ExactlyOnceDynamicConsumer ...");
                   readMessages();
           }
           private static void readMessages() throws InterruptedException {
                   KafkaConsumer<String, String> consumer = createConsumer()
                   // Manually controlling offset but register consumer to topics to get dynamically
                   // assigned partitions. Inside MyConsumerRebalancerListener use
                   // consumer.seek(topicPartition,offset) to control offset which messages to be read.
                   consumer.subscribe(Arrays.asList("normal-topic"),
                                  new MyConsumerRebalancerListener(consumer));
                   processRecords(consumer);
           }
           private static KafkaConsumer<String, String> createConsumer() {
                   Properties props = new Properties();
                   props.put("bootstrap.servers", "localhost:9092");
                   String consumeGroup = "cg3";
                   props.put("group.id", consumeGroup);
                   // To turn off the auto commit, below is a key setting.
                   props.put("enable.auto.commit", "false");
                   props.put("heartbeat.interval.ms", "2000");
                   props.put("session.timeout.ms", "6001");
                   // Control maximum data on each poll, make sure this value is bigger than the maximum                   // single message size
                   props.put("max.partition.fetch.bytes", "140");
                   props.put("key.deserializer",                                 "org.apache.kafka.common.serialization.StringDeserializer");
                   props.put("value.deserializer",                         "org.apache.kafka.common.serialization.StringDeserializer");
                   return new KafkaConsumer<String, String>(props);
           }
           private static void processRecords(KafkaConsumer<String, String> consumer) {
               while (true) {
                       ConsumerRecords<String, String> records = consumer.poll(100);
                       for (ConsumerRecord<String, String> record : records) {
                               System.out.printf("offset = %d, key = %s, value = %s
    ", record.offset(),                                     record.key(), record.value());
                               // Save processed offset in external storage.
                               offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(),                                             record.offset());
                       }
                  }
           }
    }
    public class MyConsumerRebalancerListener implements                                 org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
           private OffsetManager offsetManager = new OffsetManager("storage2");
           private Consumer<String, String> consumer;
           public MyConsumerRebalancerListener(Consumer<String, String> consumer) {
                   this.consumer = consumer;
           }
           public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                   for (TopicPartition partition : partitions) {
                       offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(),                     consumer.position(partition));
                   }
           }
           public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                   for (TopicPartition partition : partitions) {
                           consumer.seek(partition,                             offsetManager.readOffsetFromExternalStore(partition.topic(),                             partition.partition()));
                   }
           }
    }
    /**
    * The partition offset are stored in an external storage. In this case in a local file system where
    * program runs.
    */
    public class OffsetManager {
           private String storagePrefix;
           public OffsetManager(String storagePrefix) {
                   this.storagePrefix = storagePrefix;
           }
       /**
           * in an external storage, overwrite the offset for the topic.
           *
           * @param topic - Topic name.
           * @param partition - Partition of the topic.
           * @param offset - offset to be stored.
           */
           void saveOffsetInExternalStore(String topic, int partition, long offset) {
               try {
                   FileWriter writer = new FileWriter(storageName(topic, partition), false);
                   BufferedWriter bufferedWriter = new BufferedWriter(writer);
                   bufferedWriter.write(offset + "");
                   bufferedWriter.flush();
                   bufferedWriter.close();
               } catch (Exception e) {
                       e.printStackTrace();
                       throw new RuntimeException(e);
               }
           }
           /**
               * @return he last offset + 1 for the provided topic and partition.
           */
           long readOffsetFromExternalStore(String topic, int partition) {
                   try {
                           Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));
                           return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;
                   } catch (Exception e) {
                       e.printStackTrace();
                   }
                   return 0;
           }
           private String storageName(String topic, int partition) {
               return storagePrefix + "-" + topic + "-" + partition;
           }
    }
    anager(String storagePrefix) {
                   this.storagePrefix = storagePrefix;
           }
       /**
           * in an external storage, overwrite the offset for the topic.
           *
           * @param topic - Topic name.
           * @param partition - Partition of the topic.
           * @param offset - offset to be stored.
           */
           void saveOffsetInExternalStore(String topic, int partition, long offset) {
               try {
                   FileWriter writer = new FileWriter(storageName(topic, partition), false);
                   BufferedWriter bufferedWriter = new BufferedWriter(writer);
                   bufferedWriter.write(offset + "");
                   bufferedWriter.flush();
                   bufferedWriter.close();
               } catch (Exception e) {
                       e.printStackTrace();
                       throw new RuntimeException(e);
               }
           }
           /**
               * @return he last offset + 1 for the provided topic and partition.
           */
           long readOffsetFromExternalStore(String topic, int partition) {
                   try {
                           Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));
                           return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;
                   } catch (Exception e) {
                       e.printStackTrace();
                   }
                   return 0;
           }
           private String storageName(String topic, int partition) {
               return storagePrefix + "-" + topic + "-" + partition;
           }
    }

    看看Storm Kafka与配置和代码的集成
    iv。完全一次Kafka静态消费者通过分配(一次和一次消息传递)
    这里,通过'assign(2)注册方法调用,消费者向Kafka客户注册。
    确保在这种情况下应手动管理偏移量。要通过Assign设置Exactly-once Kafka Static Consumer,请按照下列步骤操作:

    • 首先,设置enable.auto.commit = false
    • 请记住,在处理完消息后,请不要调用consumer.commitSync()。
    • 此外,通过使用'assign'调用,将consumer注册到特定分区。
    • 通过调用consumer.seek(topicPartition,offset),在消费者启动时寻找特定的消息偏移量。
    • 另外,作为安全网,实施幂等。

    码:

    public class ExactlyOnceStaticConsumer {
           private static OffsetManager offsetManager = new OffsetManager("storage1");
           public static void main(String[] str) throws InterruptedException, IOException {
                   System.out.println("Starting ExactlyOnceStaticConsumer ...");
                   readMessages();
           }
           private static void readMessages() throws InterruptedException, IOException {
                   KafkaConsumer<String, String> consumer = createConsumer();
                   String topic = "normal-topic";
                   int partition =1;
                   TopicPartition topicPartition =
                                   registerConsumerToSpecificPartition(consumer, topic, partition);
                   // Read the offset for the topic and partition from external storage.
                   long offset = offsetManager.readOffsetFromExternalStore(topic, partition);
                   // Use seek and go to exact offset for that topic and partition.
                   consumer.seek(topicPartition, offset);
                   processRecords(consumer);
           }
           private static KafkaConsumer<String, String> createConsumer() {
                   Properties props = new Properties();
                   props.put("bootstrap.servers", "localhost:9092");
                   String consumeGroup = "cg2";
                   props.put("group.id", consumeGroup);
                   // To turn off the auto commit, below is a key setting.
                   props.put("enable.auto.commit", "false");
                   props.put("heartbeat.interval.ms", "2000");
                   props.put("session.timeout.ms", "6001");
                   // control maximum data on each poll, make sure this value is bigger than the maximum                 // single message size
                   props.put("max.partition.fetch.bytes", "140");
                   props.put("key.deserializer",                                     "org.apache.kafka.common.serialization.StringDeserializer");
                   props.put("value.deserializer",                                     "org.apache.kafka.common.serialization.StringDeserializer");
                   return new KafkaConsumer<String, String>(props);
           }
           /**
               * Manually listens for specific topic partition. Now, see an example of how to                * dynamically listens to partition and want to manually control offset,
               * ExactlyOnceDynamicConsumer.java
               */
            private static TopicPartition registerConsumerToSpecificPartition(
                       KafkaConsumer<String, String> consumer, String topic, int partition) {
                       TopicPartition topicPartition = new TopicPartition(topic, partition);
                       List<TopicPartition> partitions = Arrays.asList(topicPartition);
                       consumer.assign(partitions);
                       return topicPartition;
             }
               /**
                   * Process data and store offset in external store. Best practice is to do these operations
                   * atomically.
                   */
               private static void processRecords(KafkaConsumer<String, String> consumer) throws {
                       while (true) {
                              ConsumerRecords<String, String> records = consumer.poll(100);
                               for (ConsumerRecord<String, String> record : records) {
                                       System.out.printf("offset = %d, key = %s, value = %s
    ", record.offset(),                                                 record.key(), record.value());
                                       offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(),                                                 record.offset());
                               }
                       }
               }
    }

    5. Avro制片人和消费者

    在定义Avro时,它是一种开源二进制消息交换协议。基本上,为了通过线路发送优化的消息,这也减少了网络开销,我们使用它。此外,对于可以使用JSON定义的消息,Avro可以强制执行模式。通过使用这些模式,Avro可以使用各种编程语言生成绑定对象。将Avro与Kafka一起使用是本机支持的,也是强烈推荐的。
    阅读Apache Kafka + Spark Streaming Integration
    下面是一个简单的Avro消费者和制作人。

    public class AvroConsumerExample {
           public static void main(String[] str) throws InterruptedException {
                   System.out.println("Starting AutoOffsetAvroConsumerExample ...");
                   readMessages();
           }
           private static void readMessages() throws InterruptedException {
                   KafkaConsumer<String, byte[]> consumer = createConsumer();
                   // Assign to specific topic and partition.
                   consumer.assign(Arrays.asList(new TopicPartition("avro-topic", 0)));
                   processRecords(consumer);
             }
             private static void processRecords(KafkaConsumer<String, byte[]> consumer) throws {
                   while (true) {
                           ConsumerRecords<String, byte[]> records = consumer.poll(100);
                           long lastOffset = 0;
                           for (ConsumerRecord<String, byte[]> record : records) {
                                   GenericRecord genericRecord                                        = AvroSupport.byteArrayToData(AvroSupport.getSchema(),                                             record.value());
                                   String firstName = AvroSupport.getValue(genericRecord,                                             "firstName", String.class);
                                   System.out.printf("
    
    offset = %d, key = %s, value = %s", record.offset(),                                             record.key(), firstName);
                                   lastOffset = record.offset();
                           }
                       System.out.println("lastOffset read: " + lastOffset);
                       consumer.commitSync();
                   }
               }
               private static KafkaConsumer<String, byte[]> createConsumer() {
                           Properties props = new Properties();
                           props.put("bootstrap.servers", "localhost:9092");
                           String consumeGroup = "cg1";
                           props.put("group.id", consumeGroup);
                           props.put("enable.auto.commit", "true");
                           props.put("auto.offset.reset", "earliest");
                           props.put("auto.commit.interval.ms", "100");
                           props.put("heartbeat.interval.ms", "3000");
                           props.put("session.timeout.ms", "30000");
                           props.put("key.deserializer",                                     "org.apache.kafka.common.serialization.StringDeserializer");
                           props.put("value.deserializer",                                     "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                       return new KafkaConsumer<String, byte[]>(props);
               }
    }
    public class AvroProducerExample {
           public static void main(String[] str) throws InterruptedException, IOException {
                   System.out.println("Starting ProducerAvroExample ...");
                   sendMessages();
           }
           private static void sendMessages() throws InterruptedException, IOException {
                   Producer<String, byte[]> producer = createProducer();
                   sendRecords(producer);
           }
           private static Producer<String, byte[]> createProducer() {
                       Properties props = new Properties();
                       props.put("bootstrap.servers", "localhost:9092");
                       props.put("acks", "all");
                       props.put("retries", 0);
                       props.put("batch.size", 16384);
                       props.put("linger.ms", 1);
                       props.put("buffer.memory", 33554432);
                       props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                       props.put("value.serializer",                                 "org.apache.kafka.common.serialization.ByteArraySerializer");
                   return new KafkaProducer(props);
           }
           private static void sendRecords(Producer<String, byte[]> producer) throws IOException, {
                   String topic = "avro-topic";
                   int partition = 0;
                   while (true) {
                           for (int i = 1; i < 100; i++)
                               producer.send(new ProducerRecord<String, byte[]>(topic, partition,                                     Integer.toString(0), record(i + "")));
                   }
            }
            private static byte[] record(String name) throws IOException {
                       GenericRecord record = new GenericData.Record(AvroSupport.getSchema());
                       record.put("firstName", name);
                       return AvroSupport.dataToByteArray(AvroSupport.getSchema(), record);
             }
    }

    所以,这完全是关于Kafka客户端的。希望您喜欢我们对如何创建Kafka客户端的解释。

    六,结论

    因此,我们已经看到了使用Kafka API创建Kafka客户端的所有方法。此外,在这个Kafka Clients教程中,我们讨论了Kafka Producer Client,Kafka Consumer Client。除此之外,我们还了解了Avro Kafka Producer和Consumer Kafka客户。但是,如果对Kafka客户有任何疑问,请随时通过评论部分询问。 

  • 相关阅读:
    Windows 科研软件推荐
    有关Python 包 (package) 的基本知识
    《Using Python to Access Web Data》Week4 Programs that Surf the Web 课堂笔记
    Coursera助学金申请模板
    《Using Databases with Python》 Week2 Basic Structured Query Language 课堂笔记
    Jupyter 解决单个变量输出问题
    解决 pandas 中打印 DataFrame 行列显示不全的问题
    《Using Python to Access Web Data》 Week3 Networks and Sockets 课堂笔记
    缓存击穿及解决方案
    jvm垃圾收集器
  • 原文地址:https://www.cnblogs.com/a00ium/p/10852433.html
Copyright © 2011-2022 走看看