zoukankan      html  css  js  c++  java
  • 6、Spring-Kafka4

    4.1. Using Spring for Apache Kafka

    This section offers detailed explanations of the various concerns that impact using Spring for Apache Kafka. For a quick but less detailed introduction, see Quick Tour for the Impatient.

    4.1.1. Configuring Topics

    If you define a KafkaAdmin bean in your application context, it can automatically add topics to the broker. To do so, you can add a NewTopic @Bean for each topic to the application context. The following example shows how to do so:

    @Bean
    public KafkaAdmin admin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
                StringUtils.arrayToCommaDelimitedString(embeddedKafka().getBrokerAddresses()));
        return new KafkaAdmin(configs);
    }
    
    @Bean
    public NewTopic topic1() {
        return new NewTopic("thing1", 10, (short) 2);
    }
    
    @Bean
    public NewTopic topic2() {
        return new NewTopic("thing2", 10, (short) 2);
    }
    

    By default, if the broker is not available, a message is logged, but the context continues to load.
    You can programmatically invoke the admin’s initialize() method to try again later.
    If you wish this condition to be considered fatal, set the admin’s fatalIfBrokerNotAvailable property to true.
    The context then fails to initialize.

    If the broker supports it (1.0.0 or higher),
    the admin increases the number of partitions if it is found that an existing topic has fewer partitions than the NewTopic.numPartitions.
    For more advanced features, such as assigning partitions to replicas, you can use the AdminClient directly.
    The following example shows how to do so:

    @Autowired
    private KafkaAdmin admin;
    
    ...
    
        AdminClient client = AdminClient.create(admin.getConfig());
        ...
        client.close();
    
    4.1.2. Sending Messages

    This section covers how to send messages.

    Using KafkaTemplate
    This section covers how to use KafkaTemplate to send messages.

    Overview
    The KafkaTemplate wraps a producer and provides convenience methods to send data to Kafka topics.
    The following listing shows the relevant methods from KafkaTemplate:

    ListenableFuture<SendResult<K, V>> sendDefault(V data);
    
    ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
    
    ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
    
    ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
    
    ListenableFuture<SendResult<K, V>> send(String topic, V data);
    
    ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
    
    ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
    
    ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
    
    ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
    
    ListenableFuture<SendResult<K, V>> send(Message<?> message);
    
    Map<MetricName, ? extends Metric> metrics();
    
    List<PartitionInfo> partitionsFor(String topic);
    
    <T> T execute(ProducerCallback<K, V, T> callback);
    
    // Flush the producer.
    
    void flush();
    
    interface ProducerCallback<K, V, T> {
    
        T doInKafka(Producer<K, V> producer);
    
    }
    

    See the Javadoc for more detail.

    The sendDefault API requires that a default topic has been provided to the template.

    The API takes in a timestamp as a parameter and stores this timestamp in the record.
    How the user-provided timestamp is stored depends on the timestamp type configured on the Kafka topic.
    If the topic is configured to use CREATE_TIME, the user specified timestamp is recorded (or generated if not specified).
    If the topic is configured to use LOG_APPEND_TIME, the user-specified timestamp is ignored and the broker adds in the local broker time.

    The metrics and partitionsFor methods delegate to the same methods on the underlying Producer.
    The execute method provides direct access to the underlying Producer.

    To use the template, you can configure a producer factory and provide it in the template’s constructor. The following example shows how to do so:

    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // See https://kafka.apache.org/documentation/#producerconfigs for more properties
        return props;
    }
    
    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        return new KafkaTemplate<Integer, String>(producerFactory());
    }
    

    You can also configure the template by using standard definitions.

    Then, to use the template, you can invoke one of its methods.

    When you use the methods with a Message<?> parameter, the topic, partition,
    and key information is provided in a message header that includes the following items:

    KafkaHeaders.TOPIC
    
    KafkaHeaders.PARTITION_ID
    
    KafkaHeaders.MESSAGE_KEY
    
    KafkaHeaders.TIMESTAMP
    

    The message payload is the data.

    Optionally, you can configure the KafkaTemplate with a ProducerListener to get an asynchronous callback with the results of the send (success or failure) instead of waiting for the Future to complete.
    The following listing shows the definition of the ProducerListener interface:

    public interface ProducerListener<K, V> {
    
        void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);
    
        void onError(String topic, Integer partition, K key, V value, Exception exception);
    
        boolean isInterestedInSuccess();
    
    }
    

    By default, the template is configured with a LoggingProducerListener, which logs errors and does nothing when the send is successful.

    onSuccess is called only if isInterestedInSuccess returns true.

    For convenience, the abstract ProducerListenerAdapter is provided in case you want to implement only one of the methods. It returns false for isInterestedInSuccess.

    Notice that the send methods return a ListenableFuture. You can register a callback with the listener to receive the result of the send asynchronously. The following example shows how to do so:

    ListenableFuture<SendResult<Integer, String>> future = template.send("something");
    future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
    
        @Override
        public void onSuccess(SendResult<Integer, String> result) {
            ...
        }
    
        @Override
        public void onFailure(Throwable ex) {
            ...
        }
    
    });
    

    SendResult has two properties, a ProducerRecord and RecordMetadata.
    See the Kafka API documentation for information about those objects.

    If you wish to block the sending thread to await the result, you can invoke the future’s get() method.
    You may wish to invoke flush() before waiting or, for convenience, the template has a constructor with an autoFlush parameter that causes the template to flush() on each send.
    Note, however, that flushing likely significantly reduces performance.

    Examples
    This section shows examples of sending messages to Kafka:

    Example 2. Non Blocking (Async)
    public void sendToKafka(final MyOutputData data) {
        final ProducerRecord<String, String> record = createRecord(data);
    
        ListenableFuture<SendResult<Integer, String>> future = template.send(record);
        future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
    
            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                handleSuccess(data);
            }
    
            @Override
            public void onFailure(Throwable ex) {
                handleFailure(data, record, ex);
            }
    
        });
    }
    
    Blocking (Sync)
    public void sendToKafka(final MyOutputData data) {
        final ProducerRecord<String, String> record = createRecord(data);
    
        try {
            template.send(record).get(10, TimeUnit.SECONDS);
            handleSuccess(data);
        }
        catch (ExecutionException e) {
            handleFailure(data, record, e.getCause());
        }
        catch (TimeoutException | InterruptedException e) {
            handleFailure(data, record, e);
        }
    }
    
  • 相关阅读:
    大型网站随着业务的增长架构演进
    springboot日志logback配置
    一些容易出错的细节
    从一个下载优化说起
    徒手优化冒泡排序
    php设计模式之观察者模式
    php设计模式之抽象工厂模式
    phper談談最近重構代碼的感受(3)
    php设计模式----工厂模式
    偏执的我从Linux到Windows的感受
  • 原文地址:https://www.cnblogs.com/xidianzxm/p/10735792.html
Copyright © 2011-2022 走看看