1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE 3 * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file 4 * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the 5 * License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on 10 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the 11 * specific language governing permissions and limitations under the License. 12 */ 13 package org.apache.kafka.clients.producer; 14 15 import org.apache.kafka.clients.CommonClientConfigs; 16 import org.apache.kafka.clients.producer.internals.DefaultPartitioner; 17 import org.apache.kafka.common.config.AbstractConfig; 18 import org.apache.kafka.common.config.ConfigDef; 19 import org.apache.kafka.common.config.ConfigDef.Importance; 20 import org.apache.kafka.common.config.ConfigDef.Type; 21 import org.apache.kafka.common.serialization.Serializer; 22 23 import java.util.HashMap; 24 import java.util.Map; 25 import java.util.Properties; 26 27 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; 28 import static org.apache.kafka.common.config.ConfigDef.Range.between; 29 import static org.apache.kafka.common.config.ConfigDef.ValidString.in; 30 31 /** 32 * Configuration for the Kafka Producer. Documentation for these configurations can be found in the <a 33 * href="http://kafka.apache.org/documentation.html#newproducerconfigs">Kafka documentation</a> 34 */ 35 public class ProducerConfig extends AbstractConfig { 36 37 /* 38 * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS THESE ARE PART OF THE PUBLIC API AND 39 * CHANGE WILL BREAK USER CODE. 40 */ 41 42 private static final ConfigDef CONFIG; 43 44 /** <code>bootstrap.servers</code> */ 45 public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; 46 47 /** <code>metadata.fetch.timeout.ms</code> */ 48 /** 49 * @deprecated This config will be removed in a future release. Please use {@link #MAX_BLOCK_MS_CONFIG} 50 */ 51 @Deprecated 52 public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms"; 53 private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions. This " 54 + "fetch to succeed before throwing an exception back to the client."; 55 56 /** <code>metadata.max.age.ms</code> */ 57 public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG; 58 private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC; 59 60 /** <code>batch.size</code> */ 61 public static final String BATCH_SIZE_CONFIG = "batch.size"; 62 private static final String BATCH_SIZE_DOC = "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent" + " to the same partition. This helps performance on both the client and the server. This configuration controls the " 63 + "default batch size in bytes. " 64 + "<p>" 65 + "No attempt will be made to batch records larger than this size. " 66 + "<p>" 67 + "Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. " 68 + "<p>" 69 + "A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable " 70 + "batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a " 71 + "buffer of the specified batch size in anticipation of additional records."; 72 73 /** <code>buffer.memory</code> */ 74 public static final String BUFFER_MEMORY_CONFIG = "buffer.memory"; 75 private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are " + "sent faster than they can be delivered to the server the producer will either block or throw an exception based " 76 + "on the preference specified by <code>block.on.buffer.full</code>. " 77 + "<p>" 78 + "This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since " 79 + "not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if " 80 + "compression is enabled) as well as for maintaining in-flight requests."; 81 82 /** <code>acks</code> */ 83 public static final String ACKS_CONFIG = "acks"; 84 private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the " 85 + " durability of records that are sent. The following settings are common: " 86 + " <ul>" 87 + " <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the" 88 + " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be" 89 + " made that the server has received the record in this case, and the <code>retries</code> configuration will not" 90 + " take effect (as the client won't generally know of any failures). The offset given back for each record will" 91 + " always be set to -1." 92 + " <li><code>acks=1</code> This will mean the leader will write the record to its local log but will respond" 93 + " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after" 94 + " acknowledging the record but before the followers have replicated it then the record will be lost." 95 + " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to" 96 + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica" 97 + " remains alive. This is the strongest available guarantee."; 98 99 /** <code>timeout.ms</code> */ 100 101 /** 102 * @deprecated This config will be removed in a future release. Please use {@link #REQUEST_TIMEOUT_MS_CONFIG} 103 */ 104 @Deprecated 105 public static final String TIMEOUT_CONFIG = "timeout.ms"; 106 private static final String TIMEOUT_DOC = "The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to " + "meet the acknowledgment requirements the producer has specified with the <code>acks</code> configuration. If the " 107 + "requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout " 108 + "is measured on the server side and does not include the network latency of the request."; 109 110 /** <code>linger.ms</code> */ 111 public static final String LINGER_MS_CONFIG = "linger.ms"; 112 private static final String LINGER_MS_DOC = "The producer groups together any records that arrive in between request transmissions into a single batched request. " + "Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to " 113 + "reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount " 114 + "of artificial delay—that is, rather than immediately sending out a record the producer will wait for up to " 115 + "the given delay to allow other records to be sent so that the sends can be batched together. This can be thought " 116 + "of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once " 117 + "we get <code>batch.size</code> worth of records for a partition it will be sent immediately regardless of this " 118 + "setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the " 119 + "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting <code>linger.ms=5</code>, " 120 + "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load."; 121 122 /** <code>client.id</code> */ 123 public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; 124 125 /** <code>send.buffer.bytes</code> */ 126 public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG; 127 128 /** <code>receive.buffer.bytes</code> */ 129 public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG; 130 131 /** <code>max.request.size</code> */ 132 public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size"; 133 private static final String MAX_REQUEST_SIZE_DOC = "The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server " + "has its own cap on record size which may be different from this. This setting will limit the number of record " 134 + "batches the producer will send in a single request to avoid sending huge requests."; 135 136 /** <code>reconnect.backoff.ms</code> */ 137 public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG; 138 139 /** <code>block.on.buffer.full</code> */ 140 /** 141 * @deprecated This config will be removed in a future release. Also, the {@link #METADATA_FETCH_TIMEOUT_CONFIG} is no longer honored when this property is set to true. 142 */ 143 @Deprecated 144 public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full"; 145 private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default " + "this setting is true and we block, however in some scenarios blocking is not desirable and it is better to " 146 + "immediately give an error. Setting this to <code>false</code> will accomplish that: the producer will throw a BufferExhaustedException if a recrord is sent and the buffer space is full."; 147 148 /** <code>retries</code> */ 149 public static final String RETRIES_CONFIG = "retries"; 150 private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error." + " Note that this retry is no different than if the client resent the record upon receiving the " 151 + "error. Allowing retries will potentially change the ordering of records because if two records are " 152 + "sent to a single partition, and the first fails and is retried but the second succeeds, then the second record " 153 + "may appear first."; 154 155 /** <code>retry.backoff.ms</code> */ 156 public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG; 157 158 /** <code>compression.type</code> */ 159 public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; 160 private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, or <code>lz4</code>. " 161 + "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression)."; 162 163 /** <code>metrics.sample.window.ms</code> */ 164 public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG; 165 166 /** <code>metrics.num.samples</code> */ 167 public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG; 168 169 /** <code>metric.reporters</code> */ 170 public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG; 171 172 /** <code>max.in.flight.requests.per.connection</code> */ 173 public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection"; 174 private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking." 175 + " Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of" 176 + " message re-ordering due to retries (i.e., if retries are enabled)."; 177 178 /** <code>key.serializer</code> */ 179 public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer"; 180 public static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the <code>Serializer</code> interface."; 181 182 /** <code>value.serializer</code> */ 183 public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; 184 public static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface."; 185 186 /** <code>connections.max.idle.ms</code> */ 187 public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; 188 189 /** <code>partitioner.class</code> */ 190 public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class"; 191 private static final String PARTITIONER_CLASS_DOC = "Partitioner class that implements the <code>Partitioner</code> interface."; 192 193 /** <code>max.block.ms</code> */ 194 public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms"; 195 private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long {@link KafkaProducer#send()} and {@link KafkaProducer#partitionsFor} will block." 196 + "These methods can be blocked either because the buffer is full or metadata unavailable." 197 + "Blocking in the user-supplied serializers or partitioner will not be counted against this timeout."; 198 199 /** <code>request.timeout.ms</code> */ 200 public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; 201 private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; 202 203 static { 204 CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC) 205 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) 206 .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC) 207 .define(ACKS_CONFIG, 208 Type.STRING, 209 "1", 210 in("all", "-1", "0", "1"), 211 Importance.HIGH, 212 ACKS_DOC) 213 .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) 214 .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) 215 .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC) 216 .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC) 217 .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC) 218 .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC) 219 .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC) 220 .define(MAX_REQUEST_SIZE_CONFIG, 221 Type.INT, 222 1 * 1024 * 1024, 223 atLeast(0), 224 Importance.MEDIUM, 225 MAX_REQUEST_SIZE_DOC) 226 .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, false, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC) 227 .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC) 228 .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) 229 .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC) 230 .define(METADATA_FETCH_TIMEOUT_CONFIG, 231 Type.LONG, 232 60 * 1000, 233 atLeast(0), 234 Importance.LOW, 235 METADATA_FETCH_TIMEOUT_DOC) 236 .define(MAX_BLOCK_MS_CONFIG, 237 Type.LONG, 238 60 * 1000, 239 atLeast(0), 240 Importance.MEDIUM, 241 MAX_BLOCK_MS_DOC) 242 .define(REQUEST_TIMEOUT_MS_CONFIG, 243 Type.INT, 244 30 * 1000, 245 atLeast(0), 246 Importance.MEDIUM, 247 REQUEST_TIMEOUT_MS_DOC) 248 .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC) 249 .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, 250 Type.LONG, 251 30000, 252 atLeast(0), 253 Importance.LOW, 254 CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC) 255 .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, CommonClientConfigs.METRICS_NUM_SAMPLES_DOC) 256 .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 257 Type.INT, 258 5, 259 atLeast(1), 260 Importance.LOW, 261 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) 262 .define(KEY_SERIALIZER_CLASS_CONFIG, 263 Type.CLASS, 264 Importance.HIGH, 265 KEY_SERIALIZER_CLASS_DOC) 266 .define(VALUE_SERIALIZER_CLASS_CONFIG, 267 Type.CLASS, 268 Importance.HIGH, 269 VALUE_SERIALIZER_CLASS_DOC) 270 /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */ 271 .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, 272 Type.LONG, 273 9 * 60 * 1000, 274 Importance.MEDIUM, 275 CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC) 276 .define(PARTITIONER_CLASS_CONFIG, 277 Type.CLASS, 278 DefaultPartitioner.class.getName(), 279 Importance.MEDIUM, PARTITIONER_CLASS_DOC) 280 281 // security support 282 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 283 Type.STRING, 284 CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, 285 Importance.MEDIUM, 286 CommonClientConfigs.SECURITY_PROTOCOL_DOC) 287 .withClientSslSupport() 288 .withClientSaslSupport(); 289 290 } 291 292 public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs, 293 Serializer<?> keySerializer, Serializer<?> valueSerializer) { 294 Map<String, Object> newConfigs = new HashMap<String, Object>(); 295 newConfigs.putAll(configs); 296 if (keySerializer != null) 297 newConfigs.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass()); 298 if (valueSerializer != null) 299 newConfigs.put(VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass()); 300 return newConfigs; 301 } 302 303 public static Properties addSerializerToConfig(Properties properties, 304 Serializer<?> keySerializer, Serializer<?> valueSerializer) { 305 Properties newProperties = new Properties(); 306 newProperties.putAll(properties); 307 if (keySerializer != null) 308 newProperties.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass().getName()); 309 if (valueSerializer != null) 310 newProperties.put(VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass().getName()); 311 return newProperties; 312 } 313 314 ProducerConfig(Map<?, ?> props) { 315 super(CONFIG, props); 316 } 317 318 public static void main(String[] args) { 319 System.out.println(CONFIG.toHtmlTable()); 320 } 321 322 }