zoukankan      html  css  js  c++  java
  • Kafka学习-Producer和Customer

      在上一篇kafka入门的基础之上,本篇主要介绍Kafka的生产者和消费者。

    Kafka 生产者  

      kafka Producer发布消息记录到Kakfa集群。生产者是线程安全的,可以在多个线程之间共享生产者实例。一个简单的例子,使用producer发送一个有序的key/value(键值对),放到java的main方法里就能直接运行,

    public class ProducerDemo {
        private static final String KAFKA_TOPIC="kafka-topic";
        public static void main(String[] args) {
            Map<String, Object> configs = new HashMap<String, Object>();
            configs.put("bootstrap.servers", "localhost:9092");
            configs.put("acks", "all");
            configs.put("retries", 0);
            configs.put("batch.size", 16384);
            configs.put("linger.ms", 1);
            configs.put("buffer.memory", 33554432);
            configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            KafkaProducer<String, String> producer=new KafkaProducer<String, String>(configs);
            ProducerRecord<String, String> record=null;
            
            for (int i = 0; i <10; i++) {
                record=new ProducerRecord<String, String>(KAFKA_TOPIC, "record-"+i);
                Future<RecordMetadata> future=producer.send(record);
                try {
                    RecordMetadata recordMetadata=future.get();
                    System.out.format("PARTITION: %d OFFSET: %d
    ", recordMetadata.partition(),recordMetadata.offset());
                }
                catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            producer.close();
        }
    }
    View Code

       生产者的缓冲池保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到kafka集群。如果使用后不关闭生产者,则会导致资源泄露。

    • 生产者的send()方法是异步的,send()方法添加消息到缓冲区等待发送,并立即返回,这样可以并行发送多条消息而不阻塞去等待每一条消息的响应。为了减少请求的数量,生产者将单个的消息聚集在一起批量发送来提高效率。
    • acks是判断消息是否成功发送的条件,将acks指定为"all"将会阻塞消息,当所有的副本都返回后才表明该消息发送成功,这种设置性能最低,但是是最可靠的。
    • retries表示重试的次数,如果请求失败,生产者会自动重试。如果启用重试,则会有重复消息的可能性。
    • batch.size指定了缓冲区的大小,kafka的producer会缓存每个分区未发送消息。
    • linger.ms指示生产者发送请求前等待一段时间,等待更多的消息来填满缓冲区。默认缓冲可立即发送,即使缓冲空间还没有满。但是,如果想减少请求的数量,可以设置linger.time大于0。如果我们没有填满缓冲区,这个设置将增加1毫秒的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使linger.time=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。
    • buffer.memory:控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。
    • key.serializer和value.serializer示例,将用户提供的key和value对象ProducerRecord转换成字节。

    Producer的Send()

      kafka的producer的send()方法提供多种重载:

      send()是异步的,一旦消息被保存在等待发送的消息缓存中,此方法就立即返回,这样可以你并行发送多条消息而不阻塞去等待每一条消息的响应。发送的结果是一个RecordMetadata,它指定了消息发送的分区,分配的offset和消息的时间戳。如果topic使用的是CreateTime,则使用用户提供的时间戳或发送的时间,如果topic使用的是LogAppendTime,时间戳是broker的本地时间。由于send调用是异步的,它将为分配消息的此消息的RecordMetadata返回一个Future。如果future调用get(),则将阻塞,直到相关请求完成并返回该消息的metadata,或抛出发送异常。如果要模拟一个简单的阻塞调用,你可以调用get()方法。

     byte[] key = "key".getBytes();
     byte[] value = "value".getBytes();
     ProducerRecord<byte[],byte[]> record = new 
     ProducerRecord<byte[],byte[]>("my-topic", key, value)
     producer.send(record).get();
    View Code

      完全无阻塞的话,可以利用参数提供的回调函数处理请求完成时的回调通知。

    record=new ProducerRecord<String, String>(KAFKA_TOPIC, "CallbackRecord-"+i);
                producer.send(record,new Callback() {  
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        System.out.format("PARTITION: %d OFFSET: %d
    ", metadata.partition(),metadata.offset());
                    }
                });
    View Code

      发送到同一个分区的消息回调保证按一定的顺序执行,也就是说,在下面的例子中 callback1 保证执行 callback2 之前:

    producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
    producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
    View Code

      注意:callback一般在生产者的I/O线程中执行,所以是相当的快的,否则将延迟其他的线程的消息发送。如果需要执行阻塞或耗时的回调,建议在callback主体中使用自己的Executor来并行处理。

    kafka 消费者

      Kafka的消费者从集群中消费消息。当Kafka集群中的的服务器发生故障,partition的Leader进行重新选举的过程对于消费者来说是透明的。对于Kakfa集群新增或者减少消费者造成的分区重新分配对消费者也是透明的。kafka中的消费者和kafka集群TCP长连接保持联系,并从broker来拉取消息进行消费。如果消费者关闭连接失败,将会导致连接泄露。

      kafka为分区中的每条消息保存一个偏移量(offset),这个偏移量是该分区中一条消息的唯一标示符,也表示消费者在分区的位置。实际上有两个与消费者相关的“位置”概念:

    • 消费者的offset给出了下一条可以消费的消息的偏移量,这个offset会在每次在调用poll(long)中接收消息时自动增长。
    • “已提交”的位置是已安全保存的最后偏移量,如果进程失败或重新启动时,消费者将恢复到这个偏移量。消费者可以选择定期自动提交偏移量,也可以选择通过调用commit API来手动的控制。

      Kafka的消费者通过消费者组的概念进行消息的分配和处理。这些消费者可以在同一台机器运行,也可分布到多台机器上来增加可扩展性和容错性,相同group.id的消费者将视为同一个消费者组。消费者组中的每个消费者动态的订阅一个topic列表。kafka将已订阅topic的消息发送到每个消费者组中,并且通过平衡消费者组中所有成员之间的分区,以便将每个分区分配给组中的一个消费者。

      消费者组的成员是动态维护的:如果一个消费者故障,分配给它的分区将重新分配给同一个消费者组中其他的消费者;如果一个新的消费者加入到分组,将从现有消费者中的分区中移一个给它,这就是消费者组的rebalancing。 当新分区添加到订阅的topic时,或者当创建与订阅的正则表达式匹配的新topic时,也将重新平衡。kafka将通过定时刷新自动发现新的分区,并将其分配给消费者组的成员。从概念上讲,消费者组看作是由多个消费者组成的单一逻辑订阅者。此外,当分组重新分配自动发生时,可以通过ConsumerRebalanceListener通知消费者,这允许他们完成必要的应用程序级逻辑,例如状态清除,手动偏移提交等。kafka也允许消费者通过使用assign(Collection)手动分配指定分区,如果使用手动指定分配分区,那么动态分区分配和协调消费者组将失效。

      只要持续的调用poll,消费者将一直保持可用,并继续从分配的分区中接收消息。此外,消费者向服务器定时发送心跳。 如果消费者崩溃或无法在session.timeout.ms配置的时间内发送心跳,则消费者将被视为死亡,并且其分区将被重新分配。还有一种可能,消费可能遇到“活锁”的情况,它持续的发送心跳,但是没有处理。为了预防消费者在这种情况下一直持有分区,我们使用max.poll.interval.ms活跃检测机制。 在此基础上,如果你调用的poll的频率大于最大间隔,则客户端将主动地离开组,以便其他消费者接管该分区。 发生这种情况时,你会看到offset提交失败(调用commitSync()引发的CommitFailedException)。这是一种安全机制,保障只有活动成员能够提交offset。所以要留在组中,你必须持续调用poll。消费者提供两个配置设置来控制poll循环:

    • max.poll.interval.ms:增大poll的间隔,可以为消费者提供更多的时间去处理返回的消息(调用poll(long)返回的消息,通常返回的消息都是一批)。缺点是此值越大将会延迟组重新平衡。
    • max.poll.records:此设置限制每次调用poll返回的消息数,这样可以更容易的预测每次poll间隔要处理的最大值。通过调整此值,可以减少poll间隔,减少重新平衡分组的

      对于消息处理时间不可预测地的情况,这些选项是不够的。 处理这种情况的推荐方法是将消息处理移到另一个线程中,让消费者继续调用poll。 但是必须注意确保已提交的offset不超过实际位置。另外,你必须禁用自动提交,并只有在线程完成处理后才为记录手动提交偏移量(取决于你)。 还要注意,你需要pause暂停分区,不会从poll接收到新消息,让线程处理完之前返回的消息(如果你的处理能力比拉取消息的慢,那创建新线程将导致你机器内存溢出)。

    自动提交偏移量

    public class ComsumerDemo {
        public static void main(String[] args) {
            Map<String, Object> configs = new HashMap<String, Object>();
            configs.put("bootstrap.servers", "localhost:9092");
            configs.put("group.id", "test");
            configs.put("enable.auto.commit", "true");
            configs.put("auto.commit.interval.ms", "1000");
            configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
            consumer.subscribe(Arrays.asList("kafka-topic"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                records.partitions();
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }    
            }
        }
    }
    View Code

      bootstrap.servers指定一个或多个broker,不用指定全部的broker,它将自动发现集群中的其余的borker。为了避免服务器故障,导致不能发现kafka集群,最好指定多个broker。broker通过心跳机器自动检测消费者组中失败的消费者,消费者会自动ping集群,告诉kafka集群它还活着。只要消费者被认为是活着的,它就会保留被分配的分区。如果它停止心跳的时间超过session.timeout.ms,那么就会认为是故障的,它的分区将被分配到别的进程。

    手动控制偏移量

      不需要定时的提交offset,可以自己控制offset,当消息认为已消费过了,这个时候再去提交它们的偏移量。

    Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("group.id", "test");
         props.put("enable.auto.commit", "false");
         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList("foo", "bar"));
         final int minBatchSize = 200;
         List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
         while (true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records) {
                 buffer.add(record);
             }
             if (buffer.size() >= minBatchSize) {
                 insertIntoDb(buffer);
                 consumer.commitSync();
                 buffer.clear();
             }
         }
    View Code

      消费者将消费一批消息并将它们存储在内存中。当积累足够多的消息后再将它们批量插入到数据库中。如果设置offset自动提交,消费将被认为是已消费的,这样会出现问题,进程可能在批处理记录之后,但在它们被插入到数据库之前失败了。为了避免这种情况,我们将在相应的记录插入数据库之后再手动提交偏移量。这样我们可以准确控制消息是成功消费的。提出一个相反的可能性:在插入数据库之后,但是在提交之前,这个过程可能会失败(即使这可能只是几毫秒,这是一种可能性)。在这种情况下,进程将获取到已提交的偏移量,并会重复插入的最后一批数据。这种方式就是所谓的“至少一次”保证,在故障情况下,可以重复。如果您无法执行这些操作,可能会使已提交的偏移超过消耗的位置,从而导致缺少记录。 使用手动偏移控制的优点是,您可以直接控制记录何时被视为“已消耗”。

      注意:使用自动提交也可以“至少一次”。但是要求你必须下次调用poll(long)之前或关闭消费者之前,处理完所有返回的数据。如果操作失败,这将会导致已提交的offset超过消费的位置,从而导致丢失消息。使用手动控制offset的有点是,你可以直接控制消息何时提交。、

      上面的例子使用commitSync表示所有收到的消息为”已提交",在某些情况下,你可以希望更精细的控制,通过指定一个明确消息的偏移量为“已提交”。在下面,我们的例子中,我们处理完每个分区中的消息后,提交偏移量。

      try {
             while(running) {
                 ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                 for (TopicPartition partition : records.partitions()) {
                     List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                     for (ConsumerRecord<String, String> record : partitionRecords) {
                         System.out.println(record.offset() + ": " + record.value());
                     }
                     long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                     consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                 }
             }
         } finally {
           consumer.close();
         }
    View Code

      注意:已提交的offset应始终是你的程序将读取的下一条消息的offset。因此,调用commitSync(offsets)时,你应该加1个到最后处理的消息的offset。

    订阅指定的分区

      在有些情况下,你可能需要自己来控制分配指定分区,例如:消费者进程与分区保存了某种本地状态(如本地磁盘的键值存储),则它应该只能获取这个分区的消息。如果消费者进程本身具有高可用性,并且如果它失败,会自动重新启动,在这种情况下,不需要Kafka检测故障,重新分配分区,因为消费者进程将在另一台机器上重新启动。要使用此模式,你只需调用assign(Collection)消费指定的分区即可:

    String topic = "foo";
    TopicPartition partition0 = new TopicPartition(topic, 0);
    TopicPartition partition1 = new TopicPartition(topic, 1);
    consumer.assign(Arrays.asList(partition0, partition1));
    View Code

      一旦手动分配分区,你可以在循环中调用poll。消费者分组仍需要提交offset,只是现在分区的设置只能通过调用assign修改,因为手动分配不会进行分组协调,因此消费者故障不会引发分区重新平衡。每一个消费者是独立工作的(即使和其他的消费者共享GroupId)。为了避免offset提交冲突,通常你需要确认每一个consumer实例的gorupId都是唯一的。

      注意,手动分配分区和动态分区分配的订阅topic模式(即subcribe)不能混合使用。

    offset存储在其他地方

      消费者可以不使用kafka内置的offset仓库。可以选择自己来存储offset。要注意的是,将消费的offset和结果存储在同一个的系统中,用原子的方式存储结果和offset,但这不能保证原子,要想消费是完全原子的,并提供的“正好一次”的消费保证比kafka默认的“至少一次”的语义要更高。你需要使用kafka的offset提交功能。每个消息都有自己的offset,所以要管理自己的偏移,你只需要做到以下几点:

    • 配置 enable.auto.commit=false
    • 使用提供的 ConsumerRecord 来保存你的位置。
    • 在重启时用 seek(TopicPartition, long) 恢复消费者的位置。

      当分区分配也是手动完成的(像上文搜索索引的情况),这种类型的使用是最简单的。 如果分区分配是自动完成的,需要特别小心处理分区分配变更的情况。可以通过调用subscribe(Collection,ConsumerRebalanceListener)和subscribe(Pattern,ConsumerRebalanceListener)中提供的ConsumerRebalanceListener实例来完成的。例如,当分区向消费者获取时,消费者将通过实现ConsumerRebalanceListener.onPartitionsRevoked(Collection)来给这些分区提交它们offset。当分区分配给消费者时,消费者通过ConsumerRebalanceListener.onPartitionsAssigned(Collection)为新的分区正确地将消费者初始化到该位置。ConsumerRebalanceListener的另一个常见用法是清除应用已移动到其他位置的分区的缓存。

    控制消费的位置

      大多数情况下,消费者只是简单的从头到尾的消费消息,周期性的提交位置(自动或手动)。kafka也支持消费者去手动的控制消费的位置,可以消费之前的消息也可以跳过最近的消息。有几种情况,手动控制消费者的位置可能是有用的。一种场景是对于时间敏感的消费者处理程序,对足够落后的消费者,直接跳过,从最近的消费开始消费。另一个使用场景是本地状态存储系统。在这样的系统中,消费者将要在启动时初始化它的位置(无论本地存储是否包含)。同样,如果本地状态已被破坏(假设因为磁盘丢失),则可以通过重新消费所有数据并重新创建状态(假设kafka保留了足够的历史)在新的机器上重新创建。kafka使用seek(TopicPartition, long)指定新的消费位置。用于查找服务器保留的最早和最新的offset的特殊的方法也可用(seekToBeginning(Collection) 和 seekToEnd(Collection))。

    消费者流量控制

      如果消费者分配了多个分区,并同时消费所有的分区,这些分区具有相同的优先级。在一些情况下,消费者需要首先消费一些指定的分区,当指定的分区有少量或者已经没有可消费的数据时,则开始消费其他分区。例如流处理,当处理器从2个topic获取消息并把这两个topic的消息合并,当其中一个topic长时间落后另一个,则暂停消费,以便落后的赶上来。kafka支持动态控制消费流量,分别在future的poll(long)中使用pause(Collection) 和 resume(Collection) 来暂停消费指定分配的分区,重新开始消费指定暂停的分区。

    多线程处理

      Kafka消费者不是线程安全的。所有网络I/O都发生在进行调用应用程序的线程中。用户的责任是确保多线程访问正确同步的。非同步访问将导致ConcurrentModificationException。此规则唯一的例外是wakeup(),它可以安全地从外部线程来中断活动操作。在这种情况下,将从操作的线程阻塞并抛出一个WakeupException。这可用于从其他线程来关闭消费者。 以下代码段显示了典型模式: 

    public class KafkaConsumerRunner implements Runnable {
         private final AtomicBoolean closed = new AtomicBoolean(false);
         private final KafkaConsumer consumer;
    
         public void run() {
             try {
                 consumer.subscribe(Arrays.asList("topic"));
                 while (!closed.get()) {
                     ConsumerRecords records = consumer.poll(10000);
                     // Handle new records
                 }
             } catch (WakeupException e) {
                 // Ignore exception if closing
                 if (!closed.get()) throw e;
             } finally {
                 consumer.close();
             }
         }
    
         // Shutdown hook which can be called from a separate thread
         public void shutdown() {
             closed.set(true);
             consumer.wakeup();
         }
     }
    View Code 

      在单独的线程中,可以通过设置关闭标志和唤醒消费者来关闭消费者。

      closed.set(true);
      consumer.wakeup();
      我们没有多线程模型的例子。但留下几个操作可用来实现多线程处理消息。

    参考列表:

    1. http://orchome.com/451
    2. http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
    3. http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
  • 相关阅读:
    HDU 3401 Trade
    POJ 1151 Atlantis
    HDU 3415 Max Sum of MaxKsubsequence
    HDU 4234 Moving Points
    HDU 4258 Covered Walkway
    HDU 4391 Paint The Wall
    HDU 1199 Color the Ball
    HDU 4374 One hundred layer
    HDU 3507 Print Article
    GCC特性之__init修饰解析 kasalyn的专栏 博客频道 CSDN.NET
  • 原文地址:https://www.cnblogs.com/wxgblogs/p/6718940.html
Copyright © 2011-2022 走看看