zoukankan      html  css  js  c++  java
  • KafkaProducer

    KafkaProducer
    
    KafkaProducer:
    
    class kafka.KafkaProducer(**configs):
    
    一个Kafka 客户端发布消息到Kafka 集群
    
    一个producr是线程安全的, 在线程之间共享一个单独的producer instance  会比有多个实例快
    
    
    producer 由一个缓冲池组成,持有没有被发送到服务器的记录以及后台I/O thread
    
    
    负责用于转换那些记录为请求,发送到cluster
    
    send() 是异步的,当调用它增加记录到挂起的记录的Buffer 发送然后立即返回。
    
    这个允许producer批量合并单个记录。
    
    
    The ‘acks’ 配置控制标准在请求是被认为完整的。
    
    'all' 设置会导致堵塞记录的完整提交,最慢的但是是最持久化的
    
    
    如果请求失败,producer 会自动重试,除非‘retries’ 是配置为0.
    
    producer 维护没有发送的记录的buffer在每个分区。
    
    这些buffer是通过‘batch_size’ config. 
    
    让这个值更大可以缓存更多batching,但是需要更多的内存(
    
    因为我们会为每个活动的分区设置一个buffer)
    
    默认情况下,一个buffer 是可用的在立即发送后,即使有额外未使用的空间在buffer里。
    
    然后,如果你要降低请求的数量 你可以设置‘linger_ms’ 为大于0的值。
    
    
    
    这个会指示producer 来等待指定的毫秒数在发送一个请求希望更多的记录会到达来填充相同的batch.
    
    
    这是一个类似Nagle’s 算法在TCP。
    
    
    注意 记录已经同时到达会批量合并 即使 linger_ms=0
    
    因此在高load ,batching会忽略 linger configuration; 
    
    
    
    然而 设置大于0的值可能导致一些 更加有效的请求当不在最大负责
    
    
    
    buffer_memory  控制总共可用的内存对于producer用于buffering.
    
    
    如果记录是发送速度快于可以被传输到服务器的 那么这个buffer space会被耗尽。
    
    当buffer space 是被耗尽,额外的发送请求会被堵塞
    
    
    key_serializer and value_serializer 指导如何调整key和值对象
    
    Keyword Arguments:
    
    bootstrap_servers –
    
    ‘host[:port]’ string (或者 ‘host[:port]’列表字符串),
    
    
    producer 应该连接到bootstrap初始化cluster 元数据。
    
    这个不一定是完整的节点列表, 你只需要至少一个broker 来响应一个API请求
    
    默认端口是9092 
    
    from kafka import KafkaConsumer
    from kafka import KafkaProducer
    producer = KafkaProducer(bootstrap_servers='192.168.137.3:9092')
    for x in range(100):
      print x
      producer.send('test', 'kafka'+str(x))
    
    
    client_id (str)  -- 一个名字用于这个client,这个字符串是被传递在每个请求到 servers.
    
    可以用于识别指定的server-side 日志条目,默认是: ‘kafka-python-producer-#’ 
    
    acks (0, 1, 'all') –
    
    producer 需要leader在考虑一个请求完成前接收,这个控制了发送记录的持久性。
    
    0:Producer 不会等待任何server的确认。
    
    消息会立即被增加到socket buffer,认为是被发送的。
    
    无法保证server已经收到了消息 在这种情况下, retries 配置不会生效
    
    1: Wait for leader to write the record to its local log only.
    Broker will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost
    
    
    1. 等待leader 只写入到记录到它的本地log.
    
    
    Broker会响应不等待所有确认从所有的followers.
    
    在这种情况下,leader 立即失败在确认记录 但在followers 已经复制它然后记录会丢失
    
    all:
    
    等待所有副本写入记录。
    
    这个强制记录不会丢失只要失少一个同步副本仍然存活。
    
    这是最有力的保证。
    
    compression_type (str) – The compression type for all data generated by the producer. Valid values are ‘gzip’, ‘snappy’, ‘lz4’, or None. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression). Default: None.
    
    
    compression_type (str)
    
    
    
    由producer 产生的压缩类型的数据,有效的值是'gzip', ‘snappy’, ‘lz4’, or None. 
    
    
    压缩是全batchs的数据,因此批处理的效率也会影响压缩比
    
    retries (int) :
    
    设置一个值大于0会导致客户端重新发送那些由于前台的错误导致的发送失败的数据。
    
    请注意 这个retry是没有区别的如果客户端重新发送记录 当收到错误时。
    
    允许重试没有设置 max_in_flight_requests_per_connection为1 会潜在的改变记录的顺序
    
    
    因为如果有两个batches 是发送到一个单独的partition,
    
    如果第一个失败 但是第2次成功,那么记录在第2个batch 可能出现在第一个。
    
    
    注意:
    
    send(topic, value=None, key=None, partition=None, timestamp_ms=None)[source]¶
    Publish a message to a topic.
    
    发布一个消息到一个topic:
    
    参数:
    1.topic (str) -- topic 消息会发布在哪个topic
    
    
    2.值(可选的): 消息值。 必须是type buyes,或者 是可序列化的字节通过配置value_serializer.
    
    如果值是空的,key是需要的,消息充当一个'delete'.
    
    3.partition(int,可选的) 指定一个分区,如果没有指定,分区会被选择使用配置的 ‘partitioner’.
    
    from kafka import KafkaConsumer
    from kafka import KafkaProducer
    producer = KafkaProducer(bootstrap_servers='192.168.137.3:9092')
    print producer;
    for x in range(100):
      print x
      producer.send('test', 'kafka'+str(x))
    


    
                                        
    
  • 相关阅读:
    POJ 3468 A Simple Problem with Integers(线段树 区间更新)
    Windows Mobile 6.0 SDK和中文模拟器下载
    Linux学习笔记——例说makefile 头文件查找路径
    uva 11427
    腾讯2014年实习生招聘笔试面试经历
    AVC1与H264的差别
    oracle递归函数
    全部编程皆为Web编程
    JavaScript--语法2--语句结构
    JavaScript--变量和运算符
  • 原文地址:https://www.cnblogs.com/hzcya1995/p/13349514.html
Copyright © 2011-2022 走看看