zoukankan      html  css  js  c++  java
  • KafkaConsumer用法详解

    A client that consumes records from a Kafka cluster.

    This client transparently handles the failure of Kafka brokers, and transparently adapts as topic partitions it fetches migrate within the cluster. This client also interacts with the broker to allow groups of consumers to load balance consumption using consumer groups.

    The consumer maintains TCP connections to the necessary brokers to fetch data. Failure to close the consumer after use will leak these connections. The consumer is not thread-safe. See Multi-threaded Processing for more details.

    Cross-Version Compatibility

    This client can communicate with brokers that are version 0.10.0 or newer. Older or newer brokers may not support certain feature. For example, 0.10.0 brokers do not support offsetsForTimes, because this feature was added in version 0.10.1. You will receive an org.apache.kafka.common.errors.UnsupportedVersionException when invoking an API that is not avaliable on the running broker version.

    Offsets and Consumer Position

    Kafka maintains a numerical offset for each record in a partiiton. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition. For example, a consumer which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There are actually two notions of position relevant to the user of the consumer:

    The position of the consumer gives the offset of the next record that will be given out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances every time the consumer receives messages in a call to poll(Duration).

    The committed position is the last offset that has been stored securely. Should the process fail and restart, this is the offset that the consumer will recover to. The consumer can either automatically commit offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs (e.g. commitSync and commitAsync).

    This distinction gives the consumer control over when a record is considered consumed. It is discussed in further detail below.

    Consumer Groups and Topic  Subscriptions

    Kafka uses the concept of consumer groups to allow a pool of processes to divide the work of consuming and processing records. These processes can either be running on the same machine or they can be distributed over many machines to provide scalability and fault tolerance for processing. All consumer instances sharing the same group.id will be part of the same consumer group.

    Each consumer in a group can dynamically set the list of topics it wants to subscribe to through one of the subscribe APIs. Kafka will deliver each message in the subscribed topics to one process in each consumer group. This is archived by balancing the partitions between all members in the consumer group so that each partition is assigned to exactly one consumer in the group. So if there is a topic with four partitions, and a consumer group with two processes, each process would consume from two partitions.

    Membership in a consumer group is maintained dynamically: if a process fails, the partitions assigned to it will be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group, partitions will be moved from existing consumers to the new one. This is known as rebalancing the group and is discussed in more detail below. Group rebalancing is also used when new partitions are added to one of the subscribed topics or when a new topic matching a subscribed regex is created. The group will automatically detect the new partitions through periodic metadata refreshes and assign them to members of the group.

    Conceptually you can think of a consumer group as being a single logical subscriber that happens to be made up of mulitple processes. As a multi-subscriber system, Kafka naturally supports having any number of consumer groups for a given topic without duplicating data (additional consumers are acutally quite cheap).

    This is a slight generalization of the functionality that is common in messaging systems. To get semantics similar to a queue in a traditional messaging system all processes would be part of a single consumer group and hence record delivery would be balanced over the group like with a queue. Unlike a traditional messaging system, though, you can have multiple such groups. 

    In addition, when group reassignment happens automatically, consumers can be notified through a ConsumerRebalanceListener, which allows them to finish necessary application-level logic such as state cleanup, manual offsets commits, etc. See Storing Offsets Outside Kafka for more details.

    It is also possible for the consumer to manually assign specific partitions (similar to the older "simple" consumer) using assign(Collection). In this case, dynamic partition assignment and consumer group coordination will be disabled.

    Detecting Consumer Failures

    After subscribing to a set of topics, the consumer will automatically join the group when poll(Duration) is invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for a duration of session.timeout.ms, then the consumer will be considered dead and its partitions will be reassigned.

    It is also possible that the consumer could encounter a "livelock" situation where it is continuing to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions indefinitely in this case, we provide a liveness detection mechanism using the max.poll.interval.ms setting. Basically if you don't call poll at least as frequently as the configured max interval, then the client will proactively leave the group so that another consumer can take over its partitions. When this happens, you may see an offset commit failure (as indicated by a CommitFailedException thrown from a call to commitSync()). This is a safety mechanism which guarantees that only active members of the group are able to commit offsets. So to stay in the group, you must continue to call poll.

    The consumer provides two configuration settings to control the behavior of the poll loop:

    max.poll.interval.ms:By increasing the interval between expected polls, you can give the consumer more time to handle a batch of records returned from poll(Duration). The drawback is that increasing this value may delay a group rebalance since the consumer will only join the rebalance inside the call to poll. You can use this setting to bound the time to finish a rebalance, but you risk slower progress if the consumer cannot actually call poll(Duration) often enough.

    max.poll.records:Use this setting to limit the total records returned from a single call to poll. This can make it easier to predict the maximum that must be handled within each poll interval. By tuning this value, you may be able to reduce the poll interval, which will reduce the impact of group rebalancing.

    For use cases where message processing time varies unpredictably, neither of these options may be sufficient. The recommended way to handle these cases is to move message processing to another thread, which allows the consumer to continue calling poll(Duration) while the processor is still working. Some care must be taken to ensure that commited offsets do not get ahead of the actual position. Typically, you must disable automatic commits and manually commit processed offsets for records only after the thread has finished handling them (depending on the delivery semantics you need). Note also that you will need to pause(Collection) the partition so that no new records are received from poll until after thread has finished handling those previously returned.

    Usage Examples

    The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to demonstrate how to use them.

    Automatic Offset Committing

    This example demonstrates a simple usage of Kafka consumer api that relies on automatic offset committing.

        public static void main(String[] args) {
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "localhost:9092");
            props.setProperty("group.id", "test");
            props.setProperty("enable.auto.commit", "true");
            props.setProperty("auto.commit.interval.ms", "1000");
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("foo", "bar"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        }

    The connection to the cluster is bootstrapped by specifying a list of one or more brokers to contact using the configuration bootstrap.servers. This list is just used to discover the rest of the brokers in the cluster and need not be an exhaustive list of servers in the cluster (though you may want to specify more than one in case there are servers down when the client is connecting).

    Setting enable.auto.commit means that offsets are committed automatically with a frequency controlled by the config auto.commit.interval.ms, enable.auto.commit默认值是true,auto.commit.interval.ms默认值是5000.

    In this example the consumer is subscribing to the topics foo and bar as part of a group of consumers called test as configured with group.id.

    The deserializer setting specify how to turn bytes into objects. For example, by specifying string deserializers, we are saying that our record's key and value will just be simple strings.

    Manual Offset Control

    Instead of relying on the consumer to periodically commit consumed offsets, users can also control when records should be considered as consumed and hence commit their offsets. This is useful when the consumption of the messages is coupled with some processing logic and hence a message should not be considered as consumed unitl it is completed processing.

        public static void main(String[] args) {
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "localhost:9092");
            props.setProperty("group.id", "test");
            props.setProperty("enable.auto.commit", "false");
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("foo", "bar"));
            final int minBatchSize = 200;
            List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    buffer.add(record);
                }
                if (buffer.size() >= minBatchSize) {
                    insertIntoDb(buffer);
                    consumer.commitSync();
                    buffer.clear();
                }
            }
        }

    In this example we will consume a batch of records and batch them up in memory. When we have enough records batched, we will insert them into a database. If we allowed offsets to auto commit as in the previous example, records would be considered consumed after they were returned to the user in poll(Duration). It would then be possible for our process to fail after batching the records, but before they had been inserted into the database.

    To avoid this, we will manually commit the offsets only after the corresponding records have been inserted into the database. This gives us exact control of when a record is considered consumed. This raises the opposite possibility:the process could fail in the interval after the insert into the database but before the commit (even though this would likely just be a few milliseconds, it is a possibility). In this case the process that took over consumption would consume from last committed offset and would repeat the insert of the last batch of data. Used in this way Kafka provides what is often called "at-least-once" delivery guarantees, as each record will likely be delivered one time but in failure cases could be duplicated.

    Note:Using automatic offset commits can also give you "at-least-once" delivery, but the requirement is that you must consume all data returned from each call to poll(Duration) before any subsequent calls, or before closing the consumer. If you fail to do either of these, it is possible for the committed offset to get ahead of the consumed position, which results in missing records. The advantage of using manual offset control is that you have direct control over when a record is considered "consumed".

    The above example uses commitSync to mark all received records as committed. In some cases you may wish to have even finer control over which records have been committed by specifying an offset explicitly. In the example below we commit offset after we finish handling the records in each partition.

            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                    for (TopicPartition partition : records.partitions()) {
                        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                        for (ConsumerRecord<String, String> record : partitionRecords) {
                            System.out.println(record.offset() + ": " + record.value());
                        }
                        long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                    }
                }
            } finally {
                consumer.close();
            }

    Note:The committed offset should always be the offset of the next message that your application will read. Thus, when calling commitSync(offsets) you should add one to the offset of the last message processed.

    Manual Partition Assignment

    In the previous examples, we 

    Storing Offsets Outside Kafka

    The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of its own choosing. The primary use case for this is allowing the application to store both the offset and the results of the consumption in the same system in a way that both the results and offsets are stored atomically. This is not always possible, but when it is it will make the consumption fully atomic and give "exactly once" semantics that are stronger than the default "at-least-once" semantics you get with Kafka's offset commit functionality.

    Here are a couple of example of this type of usage:

    If the results of the consumption are being stored in a relational database, storing the offset in the database as well can allow committing both the results and offset in a single transaction. Thus either the transaction will succeed and the offset will be updated based on what was consumed or the result will not be stored and the offset won't be updated.

    If the results are being stored in a local storage it may be possible to store the offset there as well. For example a search index could be built by subscribing to a particular partition and storing both the offset and the indexed data together. If this is done in a way that is atomic, it is often possible to have it be the case that even if a crash occurs that causes unsync'd data to be lost, whatever is left has the coresponding offset stored as well. This means that in this case the indexing process that comes back having lost recent updates just resumes indexing from what it has ensuring that no updates are lost.???

    Each record comes with its own offset, so to manage your own offset you just need to do the following:

    Configure enable.auto.commit=false

    Use the offset provided with each ConsumerRecord to save your position.

    On restart restore the position of the consumer using seek(TopicPartition, long).

    This type of usage is simplest when the partition assignment is also done manually (this would by likely in the search index use case described above). If the partition assignment is done automatically special care is needed to handle the case where partition assignments change. This can be done by providing a ConsumerRebalanceListener instance in the call to subscribe(Collection, ConsumerRebalanceListener) and subscribe(Pattern, ConsumerRebalanceListener).

    For example, when partitions are taken from a consumer the consumer will want to commit its offset for those partitions by implementing ConsumerRebalanceListener.onPartitionsRevoked(Collection). When partitions are assigned to a consumer, the consumer will want to look up the offset for those new partitions and correctly initialize the consumer to that position by implementing ConsumerRebalanceListener.onPartitionsAssigned(Collection).

    Another common use for ConsumerRebalanceListener is to flush any caches the application maintains for partitions that are moved elsewhere.

    Controlling The Consumer's Position

    In most use cases the consumer will simply consume records from beginning to end, periodically committing its position (either automatically or manually). However Kafka allows the consumer to manually control its position, moving forward or backwards in a partition at will. This means a consumer can re-consume older records, or skip to the most recent records without actually consuming the intermediate records.

    There are several instances where manually controlling the consumer's position can be useful.

    One case is for time-sensitive record processing it may make sense for a consumer that falls far enough behind to not attempt to catch up processing all records, but rather just skip to the most recent records.

    Another use case is for a system that maintains local state as described in the previous section. In such a system the consumer will want to initialize its position on start-up to whatever is contained in the local store. Likewise if the local state is destroyed (say because the disk is lost) the state may be recreated on a new machine by re-consuming all the data and recreating the state (assuming that Kafka is retaining sufficient history).

    Kafka allows specifying the position using seek(TopicPartition, long) to specify the new position. Special methods for seeking to the earliest and latest offset the server maintains are also avaliable (seekToBeginning(Collection) and seekToEnd(Collection) respectively).

    Consumption Flow Control

    If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.

    One of such cases is stream processing, where processor fetches from two topics and performs the join on these two streams. When one of the topics is long lagging behind the other, the processor would like to pause fetching from the ahead topic in order to get the lagging stream to catch up. Another example is bootstraping upon consumer starting up where there are a lot of history data to catch up, the applications usually want to get the latest data on some of the topics before considering fetching other topics.

    Kafka supports dynamic controlling of consumption flows by using pause(Collection) and resume(Collection) to pause the consumption on the specified assigned partitions and resume the consumption on the specified paused partitions respectively in the future poll(Duration) calls.

    Reading Transactional Messages

    Transactions were introduced in Kafka 0.11.0 wherein applications can write to multiple topics and partitions atomically. In order for this to work, consumers reading from these partitions should be configured to only read commited data. This can be achieved by setting the isolation.level=read_committed in the consumer's configuration.

    In read_committed mode, the consumer will read only those transactional messages which have been successfully committed. It will continue to read non-transactional messages as before. There is no client-side buffering in read_committed mode. Instread, the end offset of a partition for a read_committed consumer would be the offset of the first message in the partition belonging to an open transactions. This offset is known as the 'Last Stable Offset'(LSO).

    A read_committed consumer will only read up to the LSO and filter out any transactional messages which have been aborted. The LSO also affects the behavior of seekToEnd(Collection) and endOffsets(Collection) for read_committed consumers, details of which are in each method's documentation. Finally, the fetch lag metrics are also adjusted to be relative to the LSO for read_committed consumers.

    Partitions with transactional messages will include commit or abort markers which indicate the result of a transaction. There markers are not returned to applications, yet have 

  • 相关阅读:
    浏览器内核
    前端必读:浏览器内部工作原理
    原生ajax
    MySQL数据备份之mysqldump使用
    Es6里面的解析结构
    zabbix 自定义key与参数Userparameters监控脚本输出
    nagios 在nrpe中自定义脚本
    nagios client 端的安装配置 以及 svr端对应的配置(转)
    nagios-4.0.8 安装部署
    zabbix 主动模式和被动模式配置文件对比
  • 原文地址:https://www.cnblogs.com/koushr/p/11739624.html
Copyright © 2011-2022 走看看