zoukankan      html  css  js  c++  java
  • SpringKafka生产端配置类ProducerConfig.java源码

      1 /**
      2  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
      3  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
      4  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
      5  * License. You may obtain a copy of the License at
      6  *
      7  * http://www.apache.org/licenses/LICENSE-2.0
      8  *
      9  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
     10  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
     11  * specific language governing permissions and limitations under the License.
     12  */
     13 package org.apache.kafka.clients.producer;
     14 
     15 import org.apache.kafka.clients.CommonClientConfigs;
     16 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
     17 import org.apache.kafka.common.config.AbstractConfig;
     18 import org.apache.kafka.common.config.ConfigDef;
     19 import org.apache.kafka.common.config.ConfigDef.Importance;
     20 import org.apache.kafka.common.config.ConfigDef.Type;
     21 import org.apache.kafka.common.serialization.Serializer;
     22 
     23 import java.util.HashMap;
     24 import java.util.Map;
     25 import java.util.Properties;
     26 
     27 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
     28 import static org.apache.kafka.common.config.ConfigDef.Range.between;
     29 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
     30 
     31 /**
     32  * Configuration for the Kafka Producer. Documentation for these configurations can be found in the <a
     33  * href="http://kafka.apache.org/documentation.html#newproducerconfigs">Kafka documentation</a>
     34  */
     35 public class ProducerConfig extends AbstractConfig {
     36 
     37     /*
     38      * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS THESE ARE PART OF THE PUBLIC API AND
     39      * CHANGE WILL BREAK USER CODE.
     40      */
     41 
     42     private static final ConfigDef CONFIG;
     43 
     44     /** <code>bootstrap.servers</code> */
     45     public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
     46 
     47     /** <code>metadata.fetch.timeout.ms</code> */
     48     /**
     49      * @deprecated This config will be removed in a future release. Please use {@link #MAX_BLOCK_MS_CONFIG}
     50      */
     51     @Deprecated
     52     public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
     53     private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions. This "
     54                                                              + "fetch to succeed before throwing an exception back to the client.";
     55 
     56     /** <code>metadata.max.age.ms</code> */
     57     public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
     58     private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC;
     59 
     60     /** <code>batch.size</code> */
     61     public static final String BATCH_SIZE_CONFIG = "batch.size";
     62     private static final String BATCH_SIZE_DOC = "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent" + " to the same partition. This helps performance on both the client and the server. This configuration controls the "
     63                                                  + "default batch size in bytes. "
     64                                                  + "<p>"
     65                                                  + "No attempt will be made to batch records larger than this size. "
     66                                                  + "<p>"
     67                                                  + "Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. "
     68                                                  + "<p>"
     69                                                  + "A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable "
     70                                                  + "batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a "
     71                                                  + "buffer of the specified batch size in anticipation of additional records.";
     72 
     73     /** <code>buffer.memory</code> */
     74     public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
     75     private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are " + "sent faster than they can be delivered to the server the producer will either block or throw an exception based "
     76                                                     + "on the preference specified by <code>block.on.buffer.full</code>. "
     77                                                     + "<p>"
     78                                                     + "This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since "
     79                                                     + "not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if "
     80                                                     + "compression is enabled) as well as for maintaining in-flight requests.";
     81 
     82     /** <code>acks</code> */
     83     public static final String ACKS_CONFIG = "acks";
     84     private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the "
     85                                            + " durability of records that are sent. The following settings are common: "
     86                                            + " <ul>"
     87                                            + " <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the"
     88                                            + " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be"
     89                                            + " made that the server has received the record in this case, and the <code>retries</code> configuration will not"
     90                                            + " take effect (as the client won't generally know of any failures). The offset given back for each record will"
     91                                            + " always be set to -1."
     92                                            + " <li><code>acks=1</code> This will mean the leader will write the record to its local log but will respond"
     93                                            + " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after"
     94                                            + " acknowledging the record but before the followers have replicated it then the record will be lost."
     95                                            + " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to"
     96                                            + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"
     97                                            + " remains alive. This is the strongest available guarantee.";
     98 
     99     /** <code>timeout.ms</code> */
    100 
    101     /**
    102      * @deprecated This config will be removed in a future release. Please use {@link #REQUEST_TIMEOUT_MS_CONFIG}
    103      */
    104     @Deprecated
    105     public static final String TIMEOUT_CONFIG = "timeout.ms";
    106     private static final String TIMEOUT_DOC = "The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to " + "meet the acknowledgment requirements the producer has specified with the <code>acks</code> configuration. If the "
    107                                               + "requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout "
    108                                               + "is measured on the server side and does not include the network latency of the request.";
    109 
    110     /** <code>linger.ms</code> */
    111     public static final String LINGER_MS_CONFIG = "linger.ms";
    112     private static final String LINGER_MS_DOC = "The producer groups together any records that arrive in between request transmissions into a single batched request. " + "Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to "
    113                                                 + "reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount "
    114                                                 + "of artificial delay&mdash;that is, rather than immediately sending out a record the producer will wait for up to "
    115                                                 + "the given delay to allow other records to be sent so that the sends can be batched together. This can be thought "
    116                                                 + "of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once "
    117                                                 + "we get <code>batch.size</code> worth of records for a partition it will be sent immediately regardless of this "
    118                                                 + "setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the "
    119                                                 + "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting <code>linger.ms=5</code>, "
    120                                                 + "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load.";
    121 
    122     /** <code>client.id</code> */
    123     public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
    124 
    125     /** <code>send.buffer.bytes</code> */
    126     public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
    127 
    128     /** <code>receive.buffer.bytes</code> */
    129     public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
    130 
    131     /** <code>max.request.size</code> */
    132     public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
    133     private static final String MAX_REQUEST_SIZE_DOC = "The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server " + "has its own cap on record size which may be different from this. This setting will limit the number of record "
    134                                                        + "batches the producer will send in a single request to avoid sending huge requests.";
    135 
    136     /** <code>reconnect.backoff.ms</code> */
    137     public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
    138 
    139     /** <code>block.on.buffer.full</code> */
    140     /**
    141      * @deprecated This config will be removed in a future release. Also, the {@link #METADATA_FETCH_TIMEOUT_CONFIG} is no longer honored when this property is set to true.
    142      */
    143     @Deprecated
    144     public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full";
    145     private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default " + "this setting is true and we block, however in some scenarios blocking is not desirable and it is better to "
    146                                                            + "immediately give an error. Setting this to <code>false</code> will accomplish that: the producer will throw a BufferExhaustedException if a recrord is sent and the buffer space is full.";
    147 
    148     /** <code>retries</code> */
    149     public static final String RETRIES_CONFIG = "retries";
    150     private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error." + " Note that this retry is no different than if the client resent the record upon receiving the "
    151                                               + "error. Allowing retries will potentially change the ordering of records because if two records are "
    152                                               + "sent to a single partition, and the first fails and is retried but the second succeeds, then the second record "
    153                                               + "may appear first.";
    154 
    155     /** <code>retry.backoff.ms</code> */
    156     public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
    157 
    158     /** <code>compression.type</code> */
    159     public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
    160     private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, or <code>lz4</code>. "
    161                                                        + "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).";
    162 
    163     /** <code>metrics.sample.window.ms</code> */
    164     public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
    165 
    166     /** <code>metrics.num.samples</code> */
    167     public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
    168 
    169     /** <code>metric.reporters</code> */
    170     public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
    171 
    172     /** <code>max.in.flight.requests.per.connection</code> */
    173     public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
    174     private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."
    175                                                                             + " Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of"
    176                                                                             + " message re-ordering due to retries (i.e., if retries are enabled).";
    177 
    178     /** <code>key.serializer</code> */
    179     public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
    180     public static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the <code>Serializer</code> interface.";
    181 
    182     /** <code>value.serializer</code> */
    183     public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
    184     public static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";
    185 
    186     /** <code>connections.max.idle.ms</code> */
    187     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
    188 
    189     /** <code>partitioner.class</code> */
    190     public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
    191     private static final String PARTITIONER_CLASS_DOC = "Partitioner class that implements the <code>Partitioner</code> interface.";
    192 
    193     /** <code>max.block.ms</code> */
    194     public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
    195     private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long {@link KafkaProducer#send()} and {@link KafkaProducer#partitionsFor} will block."
    196                                                     + "These methods can be blocked either because the buffer is full or metadata unavailable."
    197                                                     + "Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.";
    198 
    199     /** <code>request.timeout.ms</code> */
    200     public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
    201     private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
    202 
    203     static {
    204         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
    205                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
    206                                 .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
    207                                 .define(ACKS_CONFIG,
    208                                         Type.STRING,
    209                                         "1",
    210                                         in("all", "-1", "0", "1"),
    211                                         Importance.HIGH,
    212                                         ACKS_DOC)
    213                                 .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
    214                                 .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
    215                                 .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC)
    216                                 .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
    217                                 .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
    218                                 .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
    219                                 .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
    220                                 .define(MAX_REQUEST_SIZE_CONFIG,
    221                                         Type.INT,
    222                                         1 * 1024 * 1024,
    223                                         atLeast(0),
    224                                         Importance.MEDIUM,
    225                                         MAX_REQUEST_SIZE_DOC)
    226                                 .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, false, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC)
    227                                 .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
    228                                 .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
    229                                 .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
    230                                 .define(METADATA_FETCH_TIMEOUT_CONFIG,
    231                                         Type.LONG,
    232                                         60 * 1000,
    233                                         atLeast(0),
    234                                         Importance.LOW,
    235                                         METADATA_FETCH_TIMEOUT_DOC)
    236                                 .define(MAX_BLOCK_MS_CONFIG,
    237                                         Type.LONG,
    238                                         60 * 1000,
    239                                         atLeast(0),
    240                                         Importance.MEDIUM,
    241                                         MAX_BLOCK_MS_DOC)
    242                                 .define(REQUEST_TIMEOUT_MS_CONFIG,
    243                                         Type.INT,
    244                                         30 * 1000,
    245                                         atLeast(0),
    246                                         Importance.MEDIUM,
    247                                         REQUEST_TIMEOUT_MS_DOC)
    248                                 .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC)
    249                                 .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
    250                                         Type.LONG,
    251                                         30000,
    252                                         atLeast(0),
    253                                         Importance.LOW,
    254                                         CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
    255                                 .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
    256                                 .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
    257                                         Type.INT,
    258                                         5,
    259                                         atLeast(1),
    260                                         Importance.LOW,
    261                                         MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
    262                                 .define(KEY_SERIALIZER_CLASS_CONFIG,
    263                                         Type.CLASS,
    264                                         Importance.HIGH,
    265                                         KEY_SERIALIZER_CLASS_DOC)
    266                                 .define(VALUE_SERIALIZER_CLASS_CONFIG,
    267                                         Type.CLASS,
    268                                         Importance.HIGH,
    269                                         VALUE_SERIALIZER_CLASS_DOC)
    270                                 /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
    271                                 .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
    272                                         Type.LONG,
    273                                         9 * 60 * 1000,
    274                                         Importance.MEDIUM,
    275                                         CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
    276                                 .define(PARTITIONER_CLASS_CONFIG,
    277                                         Type.CLASS,
    278                                         DefaultPartitioner.class.getName(),
    279                                         Importance.MEDIUM, PARTITIONER_CLASS_DOC)
    280 
    281                                 // security support
    282                                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
    283                                         Type.STRING,
    284                                         CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
    285                                         Importance.MEDIUM,
    286                                         CommonClientConfigs.SECURITY_PROTOCOL_DOC)
    287                                 .withClientSslSupport()
    288                                 .withClientSaslSupport();
    289 
    290     }
    291 
    292     public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,
    293                                                             Serializer<?> keySerializer, Serializer<?> valueSerializer) {
    294         Map<String, Object> newConfigs = new HashMap<String, Object>();
    295         newConfigs.putAll(configs);
    296         if (keySerializer != null)
    297             newConfigs.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass());
    298         if (valueSerializer != null)
    299             newConfigs.put(VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass());
    300         return newConfigs;
    301     }
    302 
    303     public static Properties addSerializerToConfig(Properties properties,
    304                                                    Serializer<?> keySerializer, Serializer<?> valueSerializer) {
    305         Properties newProperties = new Properties();
    306         newProperties.putAll(properties);
    307         if (keySerializer != null)
    308             newProperties.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass().getName());
    309         if (valueSerializer != null)
    310             newProperties.put(VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass().getName());
    311         return newProperties;
    312     }
    313 
    314     ProducerConfig(Map<?, ?> props) {
    315         super(CONFIG, props);
    316     }
    317 
    318     public static void main(String[] args) {
    319         System.out.println(CONFIG.toHtmlTable());
    320     }
    321 
    322 }
  • 相关阅读:
    pyqt信号和槽传递额外参数
    PyQt--QTreeWidget
    转载:futex同步机制详解
    Linux 下的同步机制
    Linux 下线程的理解
    Linux下的物理内存管理2-slab缓存的管理
    转:C语言的编译链接过程的介绍
    LInux中ThreadInfo中的preempt_count字段
    LInux中的物理内存管理
    Linux下的内核抢占
  • 原文地址:https://www.cnblogs.com/jun1019/p/6541590.html
Copyright © 2011-2022 走看看