zoukankan      html  css  js  c++  java
  • Kafka实践1--Producer

    一、Kafka设计原理参考:

    http://blog.csdn.net/suifeng3051/article/details/48053965?locationNum=2

    http://www.cnblogs.com/metoy/p/4452124.html

    二、常见问题,以及解决方法:

    http://www.tuicool.com/articles/FNbQbeq

    三、Kafka官方指导文档(Producer):

     http://kafka.apache.org/documentation.html#producerconfigs

    Producer主要配置:

    bootstrap.servers      host1:port1,host2:port2,....  

    (host配置的是主机名而不是ip时,出现无法连接:Exception in thread "main" org.apache.kafka.common.config.ConfigException: DNS resolution failed for url in bootstrap.servers

      修改host的配置,把它改成ip。

      修改消费者所在机器的hosts文件,加入主机名与ip的映射。

      使用DNS(推荐)

    )

    key.serializer            Serializer class for key that implements theSerializer interface.   例如:"org.apache.kafka.common.serialization.StringSerializer"

    value.serializer         Serializer class for value that implements theSerializer interface.例如:"org.apache.kafka.common.serialization.StringSerializer"

    四、KafkaProducer的API文档:

    1.生产者不用连接zookeeper

    2.简单的producer往Kafka 集群 broker里面放消息的代码如下;如需从大文件读取数据,产生message,需要考虑大文件读取,不能把整个文件一次性全部读入内存。(http://www.importnew.com/14512.html  Java读取大文件

    3.最简单地配置以上3项即可,从官方文档可以看出,这三项没有默认值,必须配置。其他的有默认值。

    A Kafka client that publishes records to the Kafka cluster.    Kafka生产者负责向Kafka集群发送消息。

    生产者是线程安全的,多个生产者线程共享一个生产者实例。(这样比多个生产者实例更快)(线程安全相关内容可参考:http://polaris.blog.51cto.com/1146394/382161)

    The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.

    Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value pairs.

     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("acks", "all");
     props.put("retries", 0);
     props.put("batch.size", 16384);
     props.put("linger.ms", 1);
     props.put("buffer.memory", 33554432);
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
     Producer<String, String> producer = new KafkaProducer<>(props);
     for(int i = 0; i < 100; i++)
         producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
    
     producer.close();

    The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster. Failure to close the producer after use will leak these resources.

    生产者拥有一个缓存区,用来暂存还没被推送到集群的记录,同时会有一个后台I/O线程负责把这些记录发送到集群。如果关闭生产者失败,将会造成缓存区内存泄露,

    The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.

      send方法是异步的。当调用send方法时,它向缓存区中添加记录,它会立即返回。这样允许生产者高效地把相互独立的记录按批处理。

    Producer的send方法是异步,返回一个Future对象

         Future<RecordMetadata>    send(ProducerRecord<K,V> record)
    //Asynchronously send a record to a topic.

    Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) //Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
    A Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result can only be retrieved using method get when the computation has completed, blocking if necessary until it is ready. Cancellation is performed by the cancel method. Additional methods are provided to determine if the task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled. If you would like to use a Future for the sake of cancellability but not provide a usable result, you can declare types of the form Future<?> and return null as a result of the underlying task.

    The acks config controls the criteria under which requests are considered complete. The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.

    acks设置决定请求被认定为处理完成的标准。当我们设置acks为“all”,将会阻塞所有记录的提交,这是一种速度最慢但是可靠性最高的设置。

    If the request fails, the producer can automatically retry, though since we have specified retries as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on message delivery semantics for details).

    如果请求失败,生产者可以自动地重跑,但是我们设置retries为0的话就不会重跑。retries的设置也关系到能否使用副本功能。

    The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by the batch.size config. Making this larger can result in more batching, but requires more memory (since we will generally have one of these buffers for each active partition).

    生产者缓存了每个分区下未被发送的记录。缓存大小可以通过batch.size配置。增大缓存可以增大批处理量,但是会占用更多内存(由于通常情况下,会为每一分区产生一个批,即:同一分区(目录)的records才会放到一个batch里面)。

    By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you want to reduce the number of requests you can set linger.ms to something greater than 0. This will instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above, likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that records that arrive close together in time will generally batch together even with linger.ms=0 so under heavy load batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more efficient requests when not under maximal load at the cost of a small amount of latency.

    默认情况下,缓存记录可以被立即发送,即使它被加入没有使用的缓存区。然而如果你想要减少请求次数,你可以设置linger.ms为大于0的值。它将使得生产者等待指定毫秒数后再发送一个请求,以便等待更多的记录到达填充到一批数据里面。 这类似于TCP中的Nagel(纳格尔)算法。例如,在以上代码段中,100条记录将在一次请求中发送,当我们设置了linger时间为1毫秒。然而这样设置会导致请求增加1毫秒的延时,为了更多的记录到达,在缓冲区没有填满的情况下。请注意,记录近乎同时达到的,将放在同一批,即使linger.ms设置为0,在这种过负载情况下将会略linger设置。然而大多数情况下,设置linger.ms当在非极限负载的情况下,牺牲一点点时延,可以使请求发送数据变少,请求更高效。

    The buffer.memory controls the total amount of memory available to the producer for buffering. If records are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is exhausted additional send calls will block. The threshold for time to block is determined by max.block.ms after which it throws a TimeoutException.

     buffer.memory设置决定了Producer缓存区整个可用的内存。如果记录记录发送速度总是比推送到集群速度快,那么缓存区将被耗尽。当缓存区资源耗尽,消息发送send方法调用将被阻塞,阻塞的门限时间由max.block.ms设定,阻塞超过限定时间会抛出TimeoutException异常。

    缓存区大小(buffer.memory):producer可用的缓存字节数(缓存的是待发送给集群的记录),如果records发送速度比推送到集群的速度快,producer将
    阻塞max.block.ms 毫秒,超过该阻塞时间,仍然存在records send速度过快,将抛出异常。
    这个配置项可以设置成与producer用到的总内存大小相当,但是不能设置成物理内存最大值,
    因为producer用到的内存还有一部分不用于缓存。例如有些内存会用于压缩(如果配置了允许压缩)、维护正在处理的请求。

    buffer.memory    The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent 
    faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception.
    This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the 
    producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as 
    for maintaining in-flight requests.
    默认值:long    33554432    [0,...]    high

    The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes. You can use the included ByteArraySerializer or StringSerializer for simple string or byte types.

     key.serializer和value.serializer设置了如何把用户提供的key和value对象序列化成字节。你可以用ByteArraySerializer 或 StringSerializer 对简单的 字符串对象或者字节数组对象 进行序列化。

    (感觉Apache的Kafka官方文档比API文档说明的更详细一点)

    批大小(batch.size):producer将会尝试把发送到相同partition的records放到一起成批地发送,从而减少requests个数。
    这样有助于同时提高client和server的性能,这个配置项控制默认的批大小(以字节为单位)。
    不能使得一批记录的大小比这个设置大。
    发送给brokers的请求会包含多个批,每次发送一批数据。
    较小的批可以使得分批操作更少,这样会降低吞吐量(批大小为0的将导致批处理完成不可用)。较大的批将会导致内存浪费,
    因为我们一般都会为额外的记录预先分配缓存给指定批大小

    The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the 
    same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.
    No attempt will be made to batch records larger than this size.
    Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.
    A small batch size will make batching less common and may reduce throughput 
    (a batch size of zero will disable batching entirely). A very large batch size may use memory 
    a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation
    of additional records.


    max.block.ms 决定send和partitionsFor方法被阻塞的时间。当缓存区满了或者元数据不可用的时间将产生阻塞。
    用户提供的序列化或者分区者不会不利于这个时延。

    The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block.These methods 
    can be blocked either because the buffer is full or metadata unavailable.
    Blocking in the user-supplied serializers or partitioner will not 
    be counted against this timeout.

    min.insync.replicas:When a producer sets acks to "all" (or "-1"),
    当producer的ack设置为all(或者-1)时,
    min.insync.replicas specifies the minimum number of replicas
    min.insync.replicas指定了最小的副本数,
    that must acknowledge a write for the write to be considered successful.

    If this minimum cannot be met, then the producer will raise an exception(either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
    如果最小应答数不能达到配置要求,则生产者会抛出一个异常(NotEnoughReplicas或NotEnoughReplicasAterAppend异常)

    When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees.
    min.insync.replicas和acks一起使用的时候,可以让系统具有更强的稳定性保证。

    A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2,
    一个典型的配置是:一个topic配置副本因数为3,min.insync.replicas=2,acks=all,这样可以让生产者在多数副本没有接收到写应答的时候抛出一个异常。
    and produce with acks of "all". This will ensure that the producer raises an exception if a majority of replicas do not receive a write.

    官方API文档:http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

    五、查看topic是否发送成功

    调用send方法后,可以上Kafka集群环境(ssh连接过去即可)含有kafka脚本的目录下,查看topic情况:

    查看所有topic:   ./kafka-topics.sh --list --zookeeper host1:port,host2:port

    查看指定topic:  ./kafka-topics.sh --describe --zookeeper localhost:port --topic  kafka_topic    

    topic消息在磁盘中分布情况:  du -sh /data*/zdh/kafka/data/kafka_topic*

    通过消费者脚本,消费所有消息,可以看到消息内容:

    ./kafka-console-consumer.sh --zookeeper host:2181 --topic kafka_topic   --from beginning   (--from beginning 是从头开始消费,不加则是消费当前正在发送到该topic的消息)

  • 相关阅读:
    Hystrix容错处理
    Elasticsearch基本命令
    Elasticsearch
    IDEA最新破解方式
    ElasticSearch _bulk批量处理报错The bulk request must be terminated by a newline
    Mysql引擎MyISAM和InnoDB的区别
    利用文件建立与删除swap分区
    新的分区----增加与删除swap分区
    增加,删除分区,不重启更新分区表信息
    将/home迁移到一个独立分区(新的硬盘)中
  • 原文地址:https://www.cnblogs.com/yongwangzhiqian/p/5857124.html
Copyright © 2011-2022 走看看