zoukankan      html  css  js  c++  java
  • Apache Kafka

    kafka 0.10.0.0 released

     

    Interceptors的概念应该来自flume

    参考,http://blog.csdn.net/xiao_jun_0820/article/details/38111305

    比如,flume提供的

    Timestamp Interceptor

    Host Interceptor

    Static Interceptor

    Regex Filtering Interceptor

    Regex Extractor Interceptor

    可以对于流过的message进行一些包装,比如插入时间,host,或做些过滤等etl操作

     

    所以kafka在producer和consumer端也都提供这样的Interceptors接口,

     

    ProducerInterceptor

    /**
     * A plugin interface to allow things to intercept events happening to a producer record,
     * such as sending producer record or getting an acknowledgement when a record gets published
     */
    public interface ProducerInterceptor<K, V> extends Configurable {
        /**
         * This is called when client sends record to KafkaProducer, before key and value gets serialized.
         * @param record the record from client
         * @return record that is either original record passed to this method or new record with modified key and value.
         */
        public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
     
        /**
         * This is called when the send has been acknowledged
         * @param metadata The metadata for the record that was sent (i.e. the partition and offset). The metadata information may be only partially filled, if an error occurred. Topic will be always set, and if partition is not -1, partition will be set partition set/assigned to this record.
         * @param exception The exception thrown during processing of this record. Null if no error occurred.
         */
        public void onAcknowledgement(RecordMetadata metadata, Exception exception);
       
        /**
         * This is called when interceptor is closed
         */
        public void close();
    }

    onSend() will be called in KafkaProducer.send(), before key and value gets serialized and before partition gets assigned.

    If the implementation modifies key and/or value, it must return modified key and value in a new ProducerRecord object.

    onAcknowledgement() will be called when the send is acknowledged. It has same API as Callback.onCompletion(), and is called just before Callback.onCompletion() is called.

    多个multiple interceptors之间是可以串联的

    ProducerInterceptor APIs will be called from multiple threads: onSend() will be called on submitting thread and onAcknowledgement() will be called on producer I/O thread.

     

    ConsumerInterceptor

    /**
     * A plugin interface to allow things to intercept Consumer events such as receiving a record or record being consumed
     * by a client.
     */
    public interface ConsumerInterceptor<K, V> extends Configurable {
        /**
         * This is called when the records are about to be returned to the client.
         * @param records records to be consumed by the client. Null if record dropped/ignored/discarded (non consumable)
         * @return records that is either original 'records' passed to this method or modified set of records
         */
        public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
       
        /**
         * This is called when offsets get committed
         * This method will be called when the commit request sent to the server has been acknowledged.
         * @param offsets A map of the offsets and associated metadata that this callback applies to
         */
        public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
       
        /**
         * This is called when interceptor is closed
         */
        public void close();
    }

    onConsume() will be called in KafkaConsumer.poll(), just before poll() returns ConsumerRecords.

    onCommit() will be called when offsets get committed: just before OffsetCommitCallback.onCompletion() is called and in ConsumerCoordinator.commitOffsetsSync() on successful commit.

    Since new consumer is single-threaded, ConsumerInterceptor API will be called from a single thread.

     

    总结,

    Interceptor作为一种plugin可以做些,对message的decorate或cleaning或filtering等一些轻量的工作,最主要的用途还是用于监控,trace message

    Interceptor可以串联执行

    Interceptor必须要轻量,因为如果耗时就会影响链路的throughput

     

    confluent公司也提供相应的interceptor产品,用于data stream的监控

    http://docs.confluent.io/3.0.0/control-center/docs/clients.html

     

    同时,为了更好的监控和audit

    Currently, RecordMetadata contains topic/partition, offset, and timestamp (KIP-32).
    We propose to add remaining record's metadata in RecordMetadata: checksum and record size. Both checksum and record size are useful for monitoring and audit.

    For symmetry, we also propose to expose the same metadata on consumer side and make available to interceptors.

    We will add checksum and record size fields to RecordMetadata and ConsumerRecord.

    public final class RecordMetadata {

    private final long offset;

    private final TopicPartition topicPartition;

    private final long checksum;                <<== NEW: checksum of the record

    private final int size;                     <<== NEW: record size in bytes(before compression)

     

    public final class ConsumerRecord<K, V> {

    .......

    private final long checksum;               <<== NEW: checksum of the record

    private final int size;                    <<== NEW: record size in bytes (after decompression)

  • 相关阅读:
    Hadoop学习笔记
    Hadoop学习笔记 -伪分布式
    SSH 连接报错总结
    Hadoop学习笔记
    Trie 前缀树/字典树
    解数独(Leetcode-37 / HDU-1426)/回溯/状态压缩
    MyBatis 多表关联查询
    python_37期自动化【lemon】
    api课堂笔记_day14
    api课堂笔记_day12&day13
  • 原文地址:https://www.cnblogs.com/fxjwind/p/6269700.html
Copyright © 2011-2022 走看看