zoukankan      html  css  js  c++  java
  • Linux-------------kafaka基础

    一、kafka简介

    Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。Kakfa支持以发布/订阅的方式在应用间传递消息,同时并基于消息功能添加了Kafka Connect、Kafka Streams以支持连接其他系统的数据(Elasticsearch、Hadoop等)

    Kafka最核心的最成熟的还是他的消息引擎,所以Kafka大部分应用场景还是用来作为消息队列削峰平谷。另外,Kafka也是目前性能最好的消息中间件。

    在Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来负载,可以存储在1个或多个Partition中,发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。

    Broker:	Kafka节点
    Topic: 主题,用来承载消息
    Partition: 分区,用于主题分片存储
    Producer: 生产者,向主题发布消息的应用
    Consumer: 消费者,从主题订阅消息的应用
    Consumer Group: 消费者组,由多个消费者组成.
    

    二、常用命令和配置文件参数详解

    2.1 常用命令参数

    常用命令:
    kafka-server-start.sh //用于启动服务
    kafka-console-consumer.sh //常用于测试
    kafka-console-producer.sh //常用语测试
    kafka-topics.sh //常用

    2.2 kafka-server-start.sh

    用法:> bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*
    这个命令后面可以有多个参数,第一个是可选参数,该参数可以让当前命令以后台服务方式执行,第二个必须是 Kafka 的配置文件。后面还可以有多个--override开头的参数,其中的property可以是Broker Configs中提供的所有参数。这些额外的参数会覆盖配置文件中的设置。

    2.2.1 start示例

    > bin/kafka-server-start.sh -daemon config/server.properties --override broker.id=0 --override log.dirs=/tmp/kafka-logs-1 --override listeners=PLAINTEXT://:9092 --override advertised.listeners=PLAINTEXT://192.168.16.150:9092
    
    > bin/kafka-server-start.sh -daemon config/server.properties --override broker.id=1 --override log.dirs=/tmp/kafka-logs-2 --override listeners=PLAINTEXT://:9093 --override advertised.listeners=PLAINTEXT://192.168.16.150:9093
    

    上面这种用法只是用于演示,真正要启动多个Broker 应该针对不同的 Broker 创建相应的 server.properties 配置。

    2.3 kafka-console-consumer.sh

    这个命令只是简单的将消息输出到标准输出中,该命令支持的参数如下。

    option                                   Description                            
    ------                                   -----------                            
    --blacklist <String: blacklist>          Blacklist of topics to exclude from    
                                               consumption.                         
    --bootstrap-server <String: server to    REQUIRED (unless old consumer is       
      connect to>                              used): The server to connect to.     
    --consumer-property <String:             A mechanism to pass user-defined       
      consumer_prop>                           properties in the form key=value to  
                                               the consumer.                        
    --consumer.config <String: config file>  Consumer config properties file. Note  
                                               that [consumer-property] takes       
                                               precedence over this config.         
    --csv-reporter-enabled                   If set, the CSV metrics reporter will  
                                               be enabled                           
    --delete-consumer-offsets                If specified, the consumer path in     
                                               zookeeper is deleted when starting up
    --enable-systest-events                  Log lifecycle events of the consumer   
                                               in addition to logging consumed      
                                               messages. (This is specific for      
                                               system tests.)                       
    --formatter <String: class>              The name of a class to use for         
                                               formatting kafka messages for        
                                               display. (default: kafka.tools.      
                                               DefaultMessageFormatter)             
    --from-beginning                         If the consumer does not already have  
                                               an established offset to consume     
                                               from, start with the earliest        
                                               message present in the log rather    
                                               than the latest message.             
    --key-deserializer <String:                                                     
      deserializer for key>                                                         
    --max-messages <Integer: num_messages>   The maximum number of messages to      
                                               consume before exiting. If not set,  
                                               consumption is continual.            
    --metrics-dir <String: metrics           If csv-reporter-enable is set, and     
      directory>                               this parameter isset, the csv        
                                               metrics will be outputed here        
    --new-consumer                           Use the new consumer implementation.   
                                               This is the default.                 
    --offset <String: consume offset>        The offset id to consume from (a non-  
                                               negative number), or 'earliest'      
                                               which means from beginning, or       
                                               'latest' which means from end        
                                               (default: latest)                    
    --partition <Integer: partition>         The partition to consume from.         
    --property <String: prop>                The properties to initialize the       
                                               message formatter.                   
    --skip-message-on-error                  If there is an error when processing a 
                                               message, skip it instead of halt.    
    --timeout-ms <Integer: timeout_ms>       If specified, exit if no message is    
                                               available for consumption for the    
                                               specified interval.                  
    --topic <String: topic>                  The topic id to consume on.            
    --value-deserializer <String:                                                   
      deserializer for values>                                                      
    --whitelist <String: whitelist>          Whitelist of topics to include for     
                                               consumption.                         
    --zookeeper <String: urls>               REQUIRED (only when using old          
                                               consumer): The connection string for 
                                               the zookeeper connection in the form 
                                               host:port. Multiple URLS can be      
                                               given to allow fail-over. 
    

    --bootstrap-server 必须指定,通常--topic也要指定查看的主题。如果想要从头查看消息,还可以指定--from-beginning参数。

    2.3.1 kafka-console-consumer.sh示例

    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    
    还可以通过下面的命令指定分区查看:
    >> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --partition 0
    

    2.4 kafka-console-producer.sh

    这个命令可以将文件或标准输入的内容发送到Kafka集群。该命令参数如下。

    Option                                   Description                            
    ------                                   -----------                            
    --batch-size <Integer: size>             Number of messages to send in a single 
                                               batch if they are not being sent     
                                               synchronously. (default: 200)        
    --broker-list <String: broker-list>      REQUIRED: The broker list string in    
                                               the form HOST1:PORT1,HOST2:PORT2.    
    --compression-codec [String:             The compression codec: either 'none',  
      compression-codec]                       'gzip', 'snappy', or 'lz4'.If        
                                               specified without value, then it     
                                               defaults to 'gzip'                   
    --key-serializer <String:                The class name of the message encoder  
      encoder_class>                           implementation to use for            
                                               serializing keys. (default: kafka.   
                                               serializer.DefaultEncoder)           
    --line-reader <String: reader_class>     The class name of the class to use for 
                                               reading lines from standard in. By   
                                               default each line is read as a       
                                               separate message. (default: kafka.   
                                               tools.                               
                                               ConsoleProducer$LineMessageReader)   
    --max-block-ms <Long: max block on       The max time that the producer will    
      send>                                    block for during a send request      
                                               (default: 60000)                     
    --max-memory-bytes <Long: total memory   The total memory used by the producer  
      in bytes>                                to buffer records waiting to be sent 
                                               to the server. (default: 33554432)   
    --max-partition-memory-bytes <Long:      The buffer size allocated for a        
      memory in bytes per partition>           partition. When records are received 
                                               which are smaller than this size the 
                                               producer will attempt to             
                                               optimistically group them together   
                                               until this size is reached.          
                                               (default: 16384)                     
    --message-send-max-retries <Integer>     Brokers can fail receiving the message 
                                               for multiple reasons, and being      
                                               unavailable transiently is just one  
                                               of them. This property specifies the 
                                               number of retires before the         
                                               producer give up and drop this       
                                               message. (default: 3)                
    --metadata-expiry-ms <Long: metadata     The period of time in milliseconds     
      expiration interval>                     after which we force a refresh of    
                                               metadata even if we haven't seen any 
                                               leadership changes. (default: 300000)
    --old-producer                           Use the old producer implementation.   
    --producer-property <String:             A mechanism to pass user-defined       
      producer_prop>                           properties in the form key=value to  
                                               the producer.                        
    --producer.config <String: config file>  Producer config properties file. Note  
                                               that [producer-property] takes       
                                               precedence over this config.         
    --property <String: prop>                A mechanism to pass user-defined       
                                               properties in the form key=value to  
                                               the message reader. This allows      
                                               custom configuration for a user-     
                                               defined message reader.              
    --queue-enqueuetimeout-ms <Integer:      Timeout for event enqueue (default:    
      queue enqueuetimeout ms>                 2147483647)                          
    --queue-size <Integer: queue_size>       If set and the producer is running in  
                                               asynchronous mode, this gives the    
                                               maximum amount of  messages will     
                                               queue awaiting sufficient batch      
                                               size. (default: 10000)               
    --request-required-acks <String:         The required acks of the producer      
      request required acks>                   requests (default: 1)                
    --request-timeout-ms <Integer: request   The ack timeout of the producer        
      timeout ms>                              requests. Value must be non-negative 
                                               and non-zero (default: 1500)         
    --retry-backoff-ms <Integer>             Before each retry, the producer        
                                               refreshes the metadata of relevant   
                                               topics. Since leader election takes  
                                               a bit of time, this property         
                                               specifies the amount of time that    
                                               the producer waits before refreshing 
                                               the metadata. (default: 100)         
    --socket-buffer-size <Integer: size>     The size of the tcp RECV size.         
                                               (default: 102400)                    
    --sync                                   If set message send requests to the    
                                               brokers are synchronously, one at a  
                                               time as they arrive.                 
    --timeout <Integer: timeout_ms>          If set and the producer is running in  
                                               asynchronous mode, this gives the    
                                               maximum amount of time a message     
                                               will queue awaiting sufficient batch 
                                               size. The value is given in ms.      
                                               (default: 1000)                      
    --topic <String: topic>                  REQUIRED: The topic id to produce      
                                               messages to.                         
    --value-serializer <String:              The class name of the message encoder  
      encoder_class>                           implementation to use for            
                                               serializing values. (default: kafka. 
                                               serializer.DefaultEncoder)
    

    其中 --broker-list 和 --topic 是两个必须提供的参数。

    2.4.1 kafka-console-producer.sh示例

    使用标准输入方式。
    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    
    从文件读取:
    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test < file-input.txt
    

    2.5 kafka-topics.sh

    相比上面几个偶尔使用的命令来说,kafka-topics.sh 相对就比较重要。该命令包含以下参数。

    Create, delete, describe, or change a topic.
    Option                                   Description                            
    ------                                   -----------                            
    --alter                                  Alter the number of partitions,        
                                               replica assignment, and/or           
                                               configuration for the topic.         
    --config <String: name=value>            A topic configuration override for the 
                                               topic being created or altered.The   
                                               following is a list of valid         
                                               configurations:                      
                                                cleanup.policy                        
                                                compression.type                      
                                                delete.retention.ms                   
                                                file.delete.delay.ms                  
                                                flush.messages                        
                                                flush.ms                              
                                                follower.replication.throttled.       
                                               replicas                             
                                                index.interval.bytes                  
                                                leader.replication.throttled.replicas 
                                                max.message.bytes                     
                                                message.format.version                
                                                message.timestamp.difference.max.ms   
                                                message.timestamp.type                
                                                min.cleanable.dirty.ratio             
                                                min.compaction.lag.ms                 
                                                min.insync.replicas                   
                                                preallocate                           
                                                retention.bytes                       
                                                retention.ms                          
                                                segment.bytes                         
                                                segment.index.bytes                   
                                                segment.jitter.ms                     
                                                segment.ms                            
                                                unclean.leader.election.enable        
                                             See the Kafka documentation for full   
                                               details on the topic configs.        
    --create                                 Create a new topic.                    
    --delete                                 Delete a topic                         
    --delete-config <String: name>           A topic configuration override to be   
                                               removed for an existing topic (see   
                                               the list of configurations under the 
                                               --config option).                    
    --describe                               List details for the given topics.     
    --disable-rack-aware                     Disable rack aware replica assignment  
    --force                                  Suppress console prompts               
    --help                                   Print usage information.               
    --if-exists                              if set when altering or deleting       
                                               topics, the action will only execute 
                                               if the topic exists                  
    --if-not-exists                          if set when creating topics, the       
                                               action will only execute if the      
                                               topic does not already exist         
    --list                                   List all available topics.             
    --partitions <Integer: # of partitions>  正在创建或更改主题的分区数
                                             (警告:如果为具有密钥的主题   
                                             (分区)增加了分区  
                                              消息的逻辑或排序将受到影响                    
    --replica-assignment <String:            A list of manual partition-to-broker   
      broker_id_for_part1_replica1 :           assignments for the topic being      
      broker_id_for_part1_replica2 ,           created or altered.                  
      broker_id_for_part2_replica1 :                                                
      broker_id_for_part2_replica2 , ...>                                           
    --replication-factor <Integer:           正在创建的主题中每个分区的复制因子。        
      replication factor>                    
    --topic <String: topic>                  The topic to be create, alter or       
                                               describe. Can also accept a regular  
                                               expression except for --create option
    --topics-with-overrides                  if set when describing topics, only    
                                               show topics that have overridden     
                                               configs                              
    --unavailable-partitions                 if set when describing topics, only    
                                               show partitions whose leader is not  
                                               available                            
    --under-replicated-partitions            if set when describing topics, only    
                                               show under replicated partitions     
    --zookeeper <String: urls>               REQUIRED: The connection string for    
                                               the zookeeper connection in the form 
                                               host:port. Multiple URLS can be      
                                               given to allow fail-over. 
    

    2.5.1 kafka-configs.sh示例

    下面是几种常用的 topic 命令。

    描述主题的配置
    bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics --entity-name test_topic
    
    设置保留时间
    # Deprecated way
    bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test_topic --config retention.ms=1000
    
    # Modern way
    bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test_topic --add-config retention.ms=1000
    
    如果您需要删除主题中的所有消息,则可以利用保留时间。首先将保留时间设置为非常低(1000 ms),等待几秒钟,然后将保留时间恢复为上一个值。
    注意:默认保留时间为24小时(86400000毫秒)。
    
    删除主题
    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test_topic
    注意:需要在Broker的配置文件server.properties中配置 delete.topic.enable=true 才能删除主题。
    
    主题信息
    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic
    
    添加分区
    bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic test_topic --partitions 3
    
    创建主题
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test_topic
    
    列出主题
    bin/kafka-topics.sh --list --zookeeper localhost:2181
    

    2.5.2 kafka-configs.sh帮助

    topic 相关内容来源:http://ronnieroller.com/kafka/cheat-sheet
    命令那么多,怎么记?
    Kafka 的命令行工具提供了非常丰富的提示信息,所以只需要记住上面大概的几个用法,知道怎么写就行。当需要用到某个命令时,通过命令提示进行操作。

    比如说,如何使用 kafka-configs.sh 查看主题(Topic)的配置?
    首先,在命令行中输入bin/kafka-configs.sh,然后或输出下面的命令提示信息。

    Add/Remove entity config for a topic, client, user or broker
    Option                      Description                                        
    ------                      -----------                                        
    --add-config <String>       Key Value pairs of configs to add. Square brackets 
                                  can be used to group values which contain commas:
                                  'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a  
                                  list of valid configurations: For entity_type    
                                  'topics':                                        
                                    cleanup.policy                                    
                                    compression.type                                  
                                    delete.retention.ms                               
                                    file.delete.delay.ms                              
                                    flush.messages                                    
                                    flush.ms                                          
                                    follower.replication.throttled.replicas           
                                    index.interval.bytes                              
                                    leader.replication.throttled.replicas             
                                    max.message.bytes                                 
                                    message.format.version                            
                                    message.timestamp.difference.max.ms               
                                    message.timestamp.type                            
                                    min.cleanable.dirty.ratio                         
                                    min.compaction.lag.ms                             
                                    min.insync.replicas                               
                                    preallocate                                       
                                    retention.bytes                                   
                                    retention.ms                                      
                                    segment.bytes                                     
                                    segment.index.bytes                               
                                    segment.jitter.ms                                 
                                    segment.ms                                        
                                    unclean.leader.election.enable                    
                                For entity_type 'brokers':                         
                                    follower.replication.throttled.rate               
                                    leader.replication.throttled.rate                 
                                For entity_type 'users':                           
                                    producer_byte_rate                                
                                    SCRAM-SHA-256                                     
                                    SCRAM-SHA-512                                     
                                    consumer_byte_rate                                
                                For entity_type 'clients':                         
                                    producer_byte_rate                                
                                    consumer_byte_rate                                
                                Entity types 'users' and 'clients' may be specified
                                  together to update config for clients of a       
                                  specific user.                                   
    --alter                     Alter the configuration for the entity.            
    --delete-config <String>    config keys to remove 'k1,k2'                      
    --describe                  List configs for the given entity.                 
    --entity-default            Default entity name for clients/users (applies to  
                                  corresponding entity type in command line)       
    --entity-name <String>      Name of entity (topic name/client id/user principal
                                  name/broker id)                                  
    --entity-type <String>      Type of entity (topics/clients/users/brokers)      
    --force                     Suppress console prompts                           
    --help                      Print usage information.                           
    --zookeeper <String: urls>  REQUIRED: The connection string for the zookeeper  
                                  connection in the form host:port. Multiple URLS  
                                  can be given to allow fail-over.
    

    从第一行可以看到这个命令可以修改 topic, client, user 或 broker 的配置。

    如果要设置 topic,就需要设置 entity-type 为topics,输入如下命令:
    > bin/kafka-configs.sh --entity-type topics
    Command must include exactly one action: --describe, --alter
    
    命令提示需要指定一个操作(不只是上面提示的两个操作),增加--describe试试:
    > bin/kafka-configs.sh --entity-type topics --describe
    [root@localhost kafka_2.11-0.10.2.1]# bin/kafka-configs.sh --entity-type topics --describe
    Missing required argument "[zookeeper]"
    
    继续增加 --zookeeper:
    > bin/kafka-configs.sh --entity-type topics --describe --zookeeper localhost:2181
    Configs for topic '__consumer_offsets' are segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
    

    由于没有指定主题名,这里显示了__consumer_offsets的信息。下面指定一个topic试试。

    > bin/kafka-configs.sh --entity-type topics --describe --zookeeper localhost:2181 --entity-name test
    Configs for topic 'test' are
    

    此时显示了test主题的信息,这里是空。
    因为Kafka完善的命令提示,可以很轻松的通过提示信息来进行下一步操作,运用熟练后,基本上很快就能实现自己想要的命令。

    2.6 配置文件参数

    kafka的配置分为 broker、producter、consumer三个不同的配置

    2.6.1 BROKER 的全局配置

    最为核心的三个配置 broker.id、log.dir、zookeeper.connect 。

    ------------------------------------------- 系统 相关 -------------------------------------------
    ##每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
    broker.id =1
     
    ##kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
    log.dirs = /tmp/kafka-logs
     
    ##提供给客户端响应的端口
    port =6667
     
    ##消息体的最大大小,单位是字节
    message.max.bytes =1000000
     
    ## broker 处理消息的最大线程数,一般情况下不需要去修改
    num.network.threads =3
     
    ## broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
    num.io.threads =8
     
    ## 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改
    background.threads =4
     
    ## 等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,算是一种自我保护机制
    queued.max.requests =500
     
    ##broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置
    host.name
     
    ## 打广告的地址,若是设置的话,会提供给producers, consumers,其他broker连接,具体如何使用还未深究
    advertised.host.name
     
    ## 广告地址端口,必须不同于port中的设置
    advertised.port
     
    ## socket的发送缓冲区,socket的调优参数SO_SNDBUFF
    socket.send.buffer.bytes =100*1024
     
    ## socket的接受缓冲区,socket的调优参数SO_RCVBUFF
    socket.receive.buffer.bytes =100*1024
     
    ## socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
    socket.request.max.bytes =100*1024*1024
    
    ------------------------------------------- LOG 相关 -------------------------------------------
    ## topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
    log.segment.bytes =1024*1024*1024
     
    ## 这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment 会被 topic创建时的指定参数覆盖
    log.roll.hours =24*7
     
    ## 日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
    log.cleanup.policy = delete
     
    ## 数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据
    ## log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
    log.retention.minutes=7days
    
    指定日志每隔多久检查看是否可以被删除,默认1分钟
    log.cleanup.interval.mins=1
     
     
    ## topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes 。-1没有大小限制
    ## log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
    log.retention.bytes=-1
     
    ## 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
    log.retention.check.interval.ms=5minutes
     
    ## 是否开启日志压缩
    log.cleaner.enable=false
     
    ## 日志压缩运行的线程数
    log.cleaner.threads =1
     
    ## 日志压缩时候处理的最大大小
    log.cleaner.io.max.bytes.per.second=None
     
    ## 日志压缩去重时候的缓存空间 ,在空间允许的情况下,越大越好
    log.cleaner.dedupe.buffer.size=500*1024*1024
     
    ## 日志清理时候用到的IO块大小 一般不需要修改
    log.cleaner.io.buffer.size=512*1024
     
    ## 日志清理中hash表的扩大因子 一般不需要修改
    log.cleaner.io.buffer.load.factor =0.9
     
    ## 检查是否处罚日志清理的间隔
    log.cleaner.backoff.ms =15000
     
    ## 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖
    log.cleaner.min.cleanable.ratio=0.5
     
    ## 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖
    log.cleaner.delete.retention.ms =1day
     
    ## 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖
    log.index.size.max.bytes =10*1024*1024
     
    ## 当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数
    log.index.interval.bytes =4096
     
    ## log文件"sync"到磁盘之前累积的消息条数
    ## 因为磁盘IO操作是一个慢操作,但又是一个"数据可靠性"的必要手段
    ## 所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.
    ## 如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞)
    ## 如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.
    ## 物理server故障,将会导致没有fsync的消息丢失.
    log.flush.interval.messages=None
     
    ## 检查是否需要固化到硬盘的时间间隔
    log.flush.scheduler.interval.ms =3000
     
    ## 仅仅通过interval来控制消息的磁盘写入时机,是不足的.
    ## 此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔
    ## 达到阀值,也将触发.
    log.flush.interval.ms = None
     
    ## 文件在索引中清除后保留的时间 一般不需要去修改
    log.delete.delay.ms =60000
     
    ## 控制上次固化硬盘的时间点,以便于数据恢复 一般不需要去修改
    log.flush.offset.checkpoint.interval.ms =60000
    
    ------------------------------------------- TOPIC 相关 -------------------------------------------
    ## 是否允许自动创建topic ,若是false,就需要通过命令创建topic
    auto.create.topics.enable =true
     
    ## 一个topic ,默认分区的replication个数 ,不得大于集群中broker的个数
    default.replication.factor =1
     
    ## 每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖
    num.partitions =1
     
    实例 --replication-factor3--partitions1--topic replicated-topic :名称replicated-topic有一个分区,分区被复制到三个broker上。
    
    ----------------------------------复制(Leader、replicas) 相关 ----------------------------------
    ## partition leader与replicas之间通讯时,socket的超时时间
    controller.socket.timeout.ms =30000
     
    ## partition leader与replicas数据同步时,消息的队列尺寸
    controller.message.queue.size=10
     
    ## replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中
    replica.lag.time.max.ms =10000
     
    ## 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效
    ## 通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后
    ## 如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移
    ## 到其他follower中.
    ## 在broker数量较少,或者网络不足的环境中,建议提高此值.
    replica.lag.max.messages =4000
     
    ##follower与leader之间的socket超时时间
    replica.socket.timeout.ms=30*1000
     
    ## leader复制时候的socket缓存大小
    replica.socket.receive.buffer.bytes=64*1024
     
    ## replicas每次获取数据的最大大小
    replica.fetch.max.bytes =1024*1024
     
    ## replicas同leader之间通信的最大等待时间,失败了会重试
    replica.fetch.wait.max.ms =500
     
    ## fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件
    replica.fetch.min.bytes =1
     
    ## leader 进行复制的线程数,增大这个数值会增加follower的IO
    num.replica.fetchers=1
     
    ## 每个replica检查是否将最高水位进行固化的频率
    replica.high.watermark.checkpoint.interval.ms =5000
     
    ## 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker
    controlled.shutdown.enable =false
     
    ## 控制器关闭的尝试次数
    controlled.shutdown.max.retries =3
     
    ## 每次关闭尝试的时间间隔
    controlled.shutdown.retry.backoff.ms =5000
     
    ## 是否自动平衡broker之间的分配策略
    auto.leader.rebalance.enable =false
     
    ## leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
    leader.imbalance.per.broker.percentage =10
     
    ## 检查leader是否不平衡的时间间隔
    leader.imbalance.check.interval.seconds =300
     
    ## 客户端保留offset信息的最大空间大小
    offset.metadata.max.bytes
    
    ----------------------------------ZooKeeper 相关----------------------------------
    ##zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
    zookeeper.connect = localhost:2181
     
    ## ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
    zookeeper.session.timeout.ms=6000
     
    ## ZooKeeper的连接超时时间
    zookeeper.connection.timeout.ms =6000
     
    ## ZooKeeper集群中leader和follower之间的同步实际那
    zookeeper.sync.time.ms =2000
    配置的修改
    其中一部分配置是可以被每个topic自身的配置所代替,例如
    新增配置
    bin/kafka-topics.sh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config max.message.bytes=64000--config flush.messages=1
     
    修改配置
    bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --config max.message.bytes=128000
     
    删除配置 :
    bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig max.message.bytes
    

    2.6.2 CONSUMER 配置

    最为核心的配置是group.id、zookeeper.connect

    ## Consumer归属的组ID,broker是根据group.id来判断是队列模式还是发布订阅模式,非常重要
     group.id
     
    ## 消费者的ID,若是没有设置的话,会自增
     consumer.id
     
    ## 一个用于跟踪调查的ID ,最好同group.id相同
     client.id = group id value
     
    ## 对于zookeeper集群的指定,可以是多个 hostname1:port1,hostname2:port2,hostname3:port3 必须和broker使用同样的zk配置
     zookeeper.connect=localhost:2182
     
    ## zookeeper的心跳超时时间,查过这个时间就认为是dead消费者
     zookeeper.session.timeout.ms =6000
     
    ## zookeeper的等待连接时间
     zookeeper.connection.timeout.ms =6000
     
    ## zookeeper的follower同leader的同步时间
     zookeeper.sync.time.ms =2000
     
    ## 当zookeeper中没有初始的offset时候的处理方式 。smallest :重置为最小值 largest:重置为最大值 anythingelse:抛出异常
     auto.offset.reset = largest
     
    ## socket的超时时间,实际的超时时间是:max.fetch.wait + socket.timeout.ms.
     socket.timeout.ms=30*1000
     
    ## socket的接受缓存空间大小
     socket.receive.buffer.bytes=64*1024
     
    ##从每个分区获取的消息大小限制
     fetch.message.max.bytes =1024*1024
     
    ## 是否在消费消息后将offset同步到zookeeper,当Consumer失败后就能从zookeeper获取最新的offset
     auto.commit.enable =true
     
    ## 自动提交的时间间隔
     auto.commit.interval.ms =60*1000
     
    ## 用来处理消费消息的块,每个块可以等同于fetch.message.max.bytes中数值
     queued.max.message.chunks =10
     
    ## 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新
    ## 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册
    ##"Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点,
    ## 此值用于控制,注册节点的重试次数.
     rebalance.max.retries =4
     
    ## 每次再平衡的时间间隔
     rebalance.backoff.ms =2000
     
    ## 每次重新选举leader的时间
     refresh.leader.backoff.ms
     
    ## server发送到消费端的最小数据,若是不满足这个数值则会等待,知道满足数值要求
     fetch.min.bytes =1
     
    ## 若是不满足最小大小(fetch.min.bytes)的话,等待消费端请求的最长等待时间
     fetch.wait.max.ms =100
     
    ## 指定时间内没有消息到达就抛出异常,一般不需要改
     consumer.timeout.ms = -1
    

    2.6.3 PRODUCER 的配置

    比较核心的配置:metadata.broker.list、request.required.acks、producer.type、serializer.class

    ## 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面设置一个vip
     metadata.broker.list
     
    ##消息的确认模式
     ##0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP
     ##1:发送消息,并会等待leader 收到确认后,一定的可靠性
     ## -1:发送消息,等待leader收到确认,并进行复制操作后,才返回,最高的可靠性
     request.required.acks =0
     
    ## 消息发送的最长等待时间
     request.timeout.ms =10000
     
    ## socket的缓存大小
     send.buffer.bytes=100*1024
     
    ## key的序列化方式,若是没有设置,同serializer.class
     key.serializer.class
     
    ## 分区的策略,默认是取模
     partitioner.class=kafka.producer.DefaultPartitioner
     
    ## 消息的压缩模式,默认是none,可以有gzip和snappy
     compression.codec = none
     
    ## 可以针对默写特定的topic进行压缩
     compressed.topics=null
     
    ## 消息发送失败后的重试次数
     message.send.max.retries =3
     
    ## 每次失败后的间隔时间
     retry.backoff.ms =100
     
    ## 生产者定时更新topic元信息的时间间隔 ,若是设置为0,那么会在每个消息发送后都去更新数据
     topic.metadata.refresh.interval.ms =600*1000
     
    ## 用户随意指定,但是不能重复,主要用于跟踪记录消息
     client.id=""
    
    ------------------------------------------- 消息模式 相关 -------------------------------------------
     ## 生产者的类型 async:异步执行消息的发送 sync:同步执行消息的发送
     producer.type=sync
     
    ## 异步模式下,那么就会在设置的时间缓存消息,并一次性发送
     queue.buffering.max.ms =5000
     
    ## 异步的模式下 最长等待的消息数
     queue.buffering.max.messages =10000
     
    ## 异步模式下,进入队列的等待时间 若是设置为0,那么要么进入队列,要么直接抛弃
     queue.enqueue.timeout.ms = -1
     
    ## 异步模式下,每次发送的最大消息数,前提是触发了queue.buffering.max.messages或是queue.buffering.max.ms的限制
     batch.num.messages=200
     
    ## 消息体的系列化处理类 ,转化为字节流进行传输
     serializer.class= kafka.serializer.DefaultEncoder
    

    转载:原文地址 https://blog.csdn.net/wackycrazy/article/details/47810741

    三、简单安装使用

    环境:

    3.1 部署

    一、安装JDK环境
    cd /usr/local/
    rz jdk.tar.gz 
    tar xvf jdk.tar.gz
    
    #准备java环境变量
    vim /etc/profile 
    export JAVA_HOME=/data/soft/jdk
    export PATH=$PATH:$JAVA_HOME/bin
    export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:$CLASSPATH
    (末尾添加)
    
    . /etc/profile
    
    # 二、下载kafka的tar包并安装
    ## 2.1 创建应用目录,数据目录
    mkdir /data/kafka
    mkdir /data/logs/kafka
    
    #下载安装包并解压
    cd /usr/local/
    wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.1/kafka_2.12-2.3.1.tgz
    tar xf kafka_2.12-2.3.1.tgz
    mv kafka_2.12-2.3.1 kafka
    
    # 三、kafka节点配置
    
    Kafka01(zk1)
    cd /usr/kafka/kafka/
    vim config/server.properties
    21 broker.id=0
    31 listeners=PLAINTEXT://zk1:9092
    123 zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
    
    Kafka02(zk2)
    cd /usr/kafka/kafka/
    vim config/server.properties
    21 broker.id=1
    31 listeners=PLAINTEXT://zk2:9092
    
    Kafka02(zk3)
    cd /usr/kafka/kafka/
    vim config/server.properties
    21 broker.id=2
    31 listeners=PLAINTEXT://zk3:9092
    
    # 四、启动服务
    #进入kafka根目录
    cd /usr/kafka/kafka_2.12-2.3.0/
    #启动
    **nohup ./bin/kafka-server-start.sh config/server.properties &
    
    # 五、kafka测试
    # 5.1 创建topic
    ./bin/kafka-topics.sh --create --bootstrap-server 192.168.144.131:9092 --replication-factor 3 --partitions 1 --topic test-you-io
    #Topic在Kafka01上创建后也会同步到集群中另外两个Broker: Kafka02, Kafka03
    
    # 5.2 查看topic
    ./bin/kafka-topics.sh --list --bootstrap-server 192.168.144.131:9092
    test-ken-io
    test-you-io
    
    # 5.3 发送消息
    #这里我们向Broker(id=0)的Topic=test-ken-io发送消息
    (ZK2) ./bin/kafka-console-producer.sh --broker-list 192.168.144.140:9092 --topic test-zhou-io
    test by zhou.io
    test by men.io
    
    到另外两台机器消费消息,我们可以看到都能收到ZK2消息,这是因为两个消费的命令是建立了两个不同的Consumer,如果我们启动Consumer指定Consumer Group ID就可以作为一个消费组协同工,1个消息同时只会被一个Consumer消费到.
    
    (ZK1)./bin/kafka-console-consumer.sh --bootstrap-server 192.168.144.131:9092 --topic test-zhou-io --from-beginning
    test by zhou.io
    test by men.io
    
    (ZK3)./bin/kafka-console-consumer.sh --bootstrap-server 192.168.144.131:9092 --topic test-zhou-io --from-beginning
    test by zhou.io
    test by men.io
    
  • 相关阅读:
    程序员学习方法差在哪里
    解析域名
    tomcat下的公共jar包配置
    Ubuntu 16.04 修改状态栏位置
    sqlite3 C语言 API 函数
    vim配置文件
    关于 ioctl 函数
    字符设备基础了解
    Ubuntu14.04搭建Boa服务
    gcc 交叉工具链中工具使用(arm-linux-xxx)
  • 原文地址:https://www.cnblogs.com/wangchengshi/p/12684735.html
Copyright © 2011-2022 走看看