zoukankan      html  css  js  c++  java
  • kafka配置文件注解

    若advertised.host.name的值是aa,则kafka发布的服务名也要是aa

    kafka

    log.cleanup.policy=delete 日志清理策略
    
    log.retention.hours=168        (即7天)
    数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据
    log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除
    
    log.retention.bytes=-1 topic每个分区的最大文件大小
    一个topic的大小限制=分区数*log.retention.bytes -1表示没有大小限制
     
    log.retention.check.interval.ms=5minutes 文件大小检查的周期时间
    /* 自动确认offset的时间间隔  */  
    props.put("auto.commit.interval.ms", "1000");  
      
    props.put("session.timeout.ms", "30000");  
      
    //消息发送的最长等待时间.需大于session.timeout.ms这个时间  
    props.put("request.timeout.ms", "40000");  
      
    //一次从kafka中poll出来的数据条数  
    //max.poll.records条数据需要在在session.timeout.ms这个时间内处理完  
    props.put("max.poll.records","100");  
      
    //server发送到消费端的最小数据,若是不满足这个数值则会等待直到满足指定大小。默认为1表示立即接收。  
    props.put("fetch.min.bytes", "1");  
    //若是不满足fetch.min.bytes时,等待消费端请求的最长等待时间  
    props.put("fetch.wait.max.ms", "1000");  

    参数配置:https://my.oschina.net/infiniteSpace/blog/312890?p=1

    server.properties中所有配置参数说明(解释)如下列表:
    参数
    说明(解释)
    broker.id =0
    每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况
    log.dirs=/data/kafka-logs
    kafka数据的存放地址,多个地址的话用逗号分割 /data/kafka-logs-1,/data/kafka-logs-2
    port =9092
    broker server服务端口
    message.max.bytes =6525000
    表示消息体的最大大小,单位是字节
    num.network.threads =4
    broker处理消息的最大线程数,一般情况下不需要去修改
    num.io.threads =8
    broker处理磁盘IO的线程数,数值应该大于你的硬盘数
    background.threads =4
    一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改
    queued.max.requests =500
    等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,应该是一种自我保护机制。
    host.name
    broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置
    socket.send.buffer.bytes=100*1024
    socket的发送缓冲区,socket的调优参数SO_SNDBUFF
    socket.receive.buffer.bytes =100*1024
    socket的接受缓冲区,socket的调优参数SO_RCVBUFF
    socket.request.max.bytes =100*1024*1024
    socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
    log.segment.bytes =1024*1024*1024
    topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
    log.roll.hours =24*7
    这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖
    log.cleanup.policy = delete
    日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
    log.retention.minutes=3days
    数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据
    log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
    log.retention.bytes=-1
    topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes。-1没有大小限log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
    log.retention.check.interval.ms=5minutes
    文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
    log.cleaner.enable=false
    是否开启日志压缩
    log.cleaner.threads = 2
    日志压缩运行的线程数
    log.cleaner.io.max.bytes.per.second=None
    日志压缩时候处理的最大大小
    log.cleaner.dedupe.buffer.size=500*1024*1024
    日志压缩去重时候的缓存空间,在空间允许的情况下,越大越好
    log.cleaner.io.buffer.size=512*1024
    日志清理时候用到的IO块大小一般不需要修改
    log.cleaner.io.buffer.load.factor =0.9
    日志清理中hash表的扩大因子一般不需要修改
    log.cleaner.backoff.ms =15000
    检查是否处罚日志清理的间隔
    log.cleaner.min.cleanable.ratio=0.5
    日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖
    log.cleaner.delete.retention.ms =1day
    对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖
    log.index.size.max.bytes =10*1024*1024
    对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖
    log.index.interval.bytes =4096
    当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数
    log.flush.interval.messages=None
    log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性""性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失.
    log.flush.scheduler.interval.ms =3000
    检查是否需要固化到硬盘的时间间隔
    log.flush.interval.ms = None
    仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发.
    log.delete.delay.ms =60000
    文件在索引中清除后保留的时间一般不需要去修改
    log.flush.offset.checkpoint.interval.ms =60000
    控制上次固化硬盘的时间点,以便于数据恢复一般不需要去修改
    auto.create.topics.enable =true
    是否允许自动创建topic,若是false,就需要通过命令创建topic
    default.replication.factor =1
    是否允许自动创建topic,若是false,就需要通过命令创建topic
    num.partitions =1
    每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
    
    
    以下是kafka中Leader,replicas配置参数
    
    controller.socket.timeout.ms =30000
    partition leader与replicas之间通讯时,socket的超时时间
    controller.message.queue.size=10
    partition leader与replicas数据同步时,消息的队列尺寸
    replica.lag.time.max.ms =10000
    replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中
    replica.lag.max.messages =4000
    如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效
    ##通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后
    ##如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移
    ##到其他follower中.
    ##在broker数量较少,或者网络不足的环境中,建议提高此值.
    replica.socket.timeout.ms=30*1000
    follower与leader之间的socket超时时间
    replica.socket.receive.buffer.bytes=64*1024
    leader复制时候的socket缓存大小
    replica.fetch.max.bytes =1024*1024
    replicas每次获取数据的最大大小
    replica.fetch.wait.max.ms =500
    replicas同leader之间通信的最大等待时间,失败了会重试
    replica.fetch.min.bytes =1
    fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件
    num.replica.fetchers=1
    leader进行复制的线程数,增大这个数值会增加follower的IO
    replica.high.watermark.checkpoint.interval.ms =5000
    每个replica检查是否将最高水位进行固化的频率
    controlled.shutdown.enable =false
    是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker
    controlled.shutdown.max.retries =3
    控制器关闭的尝试次数
    controlled.shutdown.retry.backoff.ms =5000
    每次关闭尝试的时间间隔
    leader.imbalance.per.broker.percentage =10
    leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
    leader.imbalance.check.interval.seconds =300
    检查leader是否不平衡的时间间隔
    offset.metadata.max.bytes
    客户端保留offset信息的最大空间大小
    kafka中zookeeper参数配置
    
    zookeeper.connect = localhost:2181
    zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
    zookeeper.session.timeout.ms=6000
    ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
    zookeeper.connection.timeout.ms =6000
    ZooKeeper的连接超时时间
    zookeeper.sync.time.ms =2000
    ZooKeeper集群中leader和follower之间的同步实际那

    参数配置介绍:http://blog.csdn.net/lizhitao/article/details/25667831

    参数 说明(解释)
    broker.id   =0 每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况
    log.dirs=/data/kafka-logs kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能  /data/kafka-logs-1/data/kafka-logs-2
    port   =9092 broker server服务端口
    message.max.bytes   =6525000 表示消息体的最大大小,单位是字节
    num.network.threads   =4 broker处理消息的最大线程数,一般情况下数量为cpu核数
    num.io.threads   =8 broker处理磁盘IO的线程数,数值为cpu核数2倍
    background.threads   =4 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改
    queued.max.requests   =500 等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,应该是一种自我保护机制。
    host.name broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置
    socket.send.buffer.bytes=100*1024 socket的发送缓冲区,socket的调优参数SO_SNDBUFF
    socket.receive.buffer.bytes   =100*1024 socket的接受缓冲区,socket的调优参数SO_RCVBUFF
    socket.request.max.bytes   =100*1024*1024 socket请求的最大数值,防止serverOOMmessage.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
    log.segment.bytes   =1024*1024*1024 topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
    log.roll.hours   =24*7 这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖
    log.retention.minutes=300或者log.retention.hours=24 数据文件保留多长时间,存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略。
       
    log.retention.byteslog.retention.minuteslog.retention.hours任意一个达到要求,都会执行删除。
    log.cleanup.policy   = delete 日志清理策略选择有:deletecompact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
    log.retention.bytes=-1 topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes-1没有大小限log.retention.byteslog.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
    log.retention.check.interval.ms=5minutes 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
    log.cleaner.enable=false 是否开启日志清理
    log.cleaner.threads   = 2 日志清理运行的线程数
    log.cleaner.io.max.bytes.per.second=None 日志清理时候处理的最大大小
    log.cleaner.dedupe.buffer.size=500*1024*1024 日志清理去重时候的缓存空间,在空间允许的情况下,越大越好
    log.cleaner.io.buffer.size=512*1024 日志清理时候用到的IO块大小一般不需要修改
    log.cleaner.io.buffer.load.factor   =0.9 日志清理中hash表的扩大因子一般不需要修改
    log.cleaner.backoff.ms   =15000 检查是否处罚日志清理的间隔
    log.cleaner.min.cleanable.ratio=0.5 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖
    log.cleaner.delete.retention.ms   =1day 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖
    log.index.size.max.bytes   =10*1024*1024 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖
    log.index.interval.bytes   =4096 当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数
    log.flush.interval.messages=None
        例如log.flush.interval.messages=1000
       
    log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性""性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失.
    log.flush.scheduler.interval.ms   =3000 检查是否需要固化到硬盘的时间间隔
    log.flush.interval.ms   = None
        例如:log.flush.interval.ms=1000
       
    表示每间隔1000毫秒flush一次数据到磁盘
       
    仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发.
    log.delete.delay.ms   =60000 文件在索引中清除后保留的时间一般不需要去修改
    log.flush.offset.checkpoint.interval.ms   =60000 控制上次固化硬盘的时间点,以便于数据恢复一般不需要去修改
    auto.create.topics.enable   =true 是否允许自动创建topic,若是false,就需要通过命令创建topic
    default.replication.factor =1 是否允许自动创建topic,若是false,就需要通过命令创建topic
    num.partitions   =1 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
    以下是kafkaLeader,replicas配置参数  
    controller.socket.timeout.ms   =30000 partition leaderreplicas之间通讯时,socket的超时时间
    controller.message.queue.size=10 partition leaderreplicas数据同步时,消息的队列尺寸
    replica.lag.time.max.ms   =10000 replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中
    replica.lag.max.messages   =4000 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效
       
    ##通常,followerleader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后
       
    ##到其他follower.##broker数量较少,或者网络不足的环境中,建议提高此值.
       

       
    ##如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移
    replica.socket.timeout.ms=30*1000 followerleader之间的socket超时时间
    replica.socket.receive.buffer.bytes=64*1024 leader复制时候的socket缓存大小
    replica.fetch.max.bytes   =1024*1024 replicas每次获取数据的最大大小
    replica.fetch.wait.max.ms   =500 replicasleader之间通信的最大等待时间,失败了会重试
    replica.fetch.min.bytes   =1 fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件
    num.replica.fetchers=1 leader进行复制的线程数,增大这个数值会增加followerIO
    replica.high.watermark.checkpoint.interval.ms   =5000 每个replica检查是否将最高水位进行固化的频率
    controlled.shutdown.enable   =false 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker
    controlled.shutdown.max.retries   =3 控制器关闭的尝试次数
    controlled.shutdown.retry.backoff.ms   =5000 每次关闭尝试的时间间隔
    leader.imbalance.per.broker.percentage   =10 leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
    leader.imbalance.check.interval.seconds   =300 检查leader是否不平衡的时间间隔
    offset.metadata.max.bytes 客户端保留offset信息的最大空间大小
    kafkazookeeper参数配置  
    zookeeper.connect   = localhost:2181 zookeeper集群的地址,可以是多个,多个之间用逗号分割hostname1:port1,hostname2:port2,hostname3:port3
    zookeeper.session.timeout.ms=6000 ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
    zookeeper.connection.timeout.ms   =6000 ZooKeeper的连接超时时间
    zookeeper.sync.time.ms   =2000 ZooKeeper集群中leaderfollower之间的同步实际那

    参数

    默认值

    说明(解释)

    broker.id =0

     

    每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况

    log.dirs=/data/kafka-logs

     

    kafka数据的存放地址,多个地址的话用逗号分割/data/kafka-logs-1,/data/kafka-logs-2

    port =9092

     

    broker   server服务端口

    message.max.bytes   =6525000

     

    表示消息体的最大大小,单位是字节

    num.network.threads   =4

     

    broker处理消息的最大线程数,一般情况下不需要去修改

    num.io.threads =8

     

    broker处理磁盘IO的线程数,数值应该大于你的硬盘数

    background.threads   =4

     

    一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改

    queued.max.requests   =500

     

    等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,应该是一种自我保护机制。

    host.name

     

    broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置

    socket.send.buffer.bytes=100*1024

     

    socket的发送缓冲区,socket的调优参数SO_SNDBUFF

    socket.receive.buffer.bytes   =100*1024

     

    socket的接受缓冲区,socket的调优参数SO_RCVBUFF

    socket.request.max.bytes   =100*1024*1024

     

    socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖

    log.segment.bytes   =1024*1024*1024

     

    topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖

    log.roll.hours =24*7

     

    这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖

    log.cleanup.policy   = delete

     

    日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖

    log.retention.minutes=3days

     

    数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据

    log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖

    log.retention.bytes=-1

     

    topic每个分区的最大文件大小,一个topic的大小限制 =分区数*log.retention.bytes。-1没有大小限log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖

    log.retention.check.interval.ms=5minutes

     

    文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略

    log.cleaner.enable=false

     

    是否开启日志压缩

    log.cleaner.threads   = 2

     

    日志压缩运行的线程数

    log.cleaner.io.max.bytes.per.second=None

     

    日志压缩时候处理的最大大小

    log.cleaner.dedupe.buffer.size=500*1024*1024

     

    日志压缩去重时候的缓存空间,在空间允许的情况下,越大越好

    log.cleaner.io.buffer.size=512*1024

     

    日志清理时候用到的IO块大小一般不需要修改

    log.cleaner.io.buffer.load.factor   =0.9

     

    日志清理中hash表的扩大因子一般不需要修改

    log.cleaner.backoff.ms   =15000

     

    检查是否处罚日志清理的间隔

    log.cleaner.min.cleanable.ratio=0.5

     

    日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖

    log.cleaner.delete.retention.ms   =1day

     

    对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖

    log.index.size.max.bytes   =10*1024*1024

     

    对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖

    log.index.interval.bytes   =4096

     

    当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数

    log.flush.interval.messages=None

     

    log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失.

    log.flush.scheduler.interval.ms   =3000

     

    检查是否需要固化到硬盘的时间间隔

    log.flush.interval.ms   = None

     

    仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发.

    log.delete.delay.ms   =60000

     

    文件在索引中清除后保留的时间一般不需要去修改

    log.flush.offset.checkpoint.interval.ms   =60000

     

    控制上次固化硬盘的时间点,以便于数据恢复一般不需要去修改

    auto.create.topics.enable   =true

     

    是否允许自动创建topic,若是false,就需要通过命令创建topic

    default.replication.factor =1

     

    是否允许自动创建topic,若是false,就需要通过命令创建topic

    num.partitions =1

     

    每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖

         

    以下是kafka中Leader,replicas配置参数

       

    controller.socket.timeout.ms   =30000

     

    partition leader与replicas之间通讯时,socket的超时时间

    controller.message.queue.size=10

     

    partition leader与replicas数据同步时,消息的队列尺寸

    replica.lag.time.max.ms   =10000

     

    replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中

    replica.lag.max.messages   =4000

     

    如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效

    ##通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后

    ##如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移

    ##到其他follower中.

    ##在broker数量较少,或者网络不足的环境中,建议提高此值.

    replica.socket.timeout.ms=30*1000

     

    follower与leader之间的socket超时时间

    replica.socket.receive.buffer.bytes=64*1024

     

    leader复制时候的socket缓存大小

    replica.fetch.max.bytes   =1024*1024

     

    replicas每次获取数据的最大大小

    replica.fetch.wait.max.ms   =500

     

    replicas同leader之间通信的最大等待时间,失败了会重试

    replica.fetch.min.bytes   =1

     

    fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件

    num.replica.fetchers=1

     

    leader进行复制的线程数,增大这个数值会增加follower的IO

    replica.high.watermark.checkpoint.interval.ms   =5000

     

    每个replica检查是否将最高水位进行固化的频率

    controlled.shutdown.enable   =false

     

    是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker

    controlled.shutdown.max.retries   =3

     

    控制器关闭的尝试次数

    controlled.shutdown.retry.backoff.ms   =5000

     

    每次关闭尝试的时间间隔

    leader.imbalance.per.broker.percentage   =10

     

    leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡

    leader.imbalance.check.interval.seconds   =300

     

    检查leader是否不平衡的时间间隔

    offset.metadata.max.bytes

     

    客户端保留offset信息的最大空间大小

    kafka中zookeeper参数配置

       

    zookeeper.connect   = localhost:2181

     

    zookeeper集群的地址,可以是多个,多个之间用逗号分割hostname1:port1,hostname2:port2,hostname3:port3

    zookeeper.session.timeout.ms=6000

     

    ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大

    zookeeper.connection.timeout.ms   =6000

     

    ZooKeeper的连接超时时间

    zookeeper.sync.time.ms   =2000

     

    ZooKeeper集群中leader和follower之间的同步实际那

    broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
    port=19092 #当前kafka对外提供服务的端口默认是9092
    host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
    num.network.threads=3 #这个是borker进行网络处理的线程数
    num.io.threads=8 #这个是borker进行I/O处理的线程数
    log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
    socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
    socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
    socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
    num.partitions=1 #默认的分区数,一个topic默认1个分区数
    log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
    message.max.byte=5242880  #消息保存的最大值5M
    default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
    replica.fetch.max.bytes=5242880  #取消息的最大直接数
    log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
    log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
    log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
    zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口

    =====================

    ==========================

    zookeeper

    #tickTime:
    这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
    #initLimit:
    这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒
    #syncLimit:
    这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒
    #dataDir:
    快照日志的存储路径
    #dataLogDir:
    事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
    #clientPort:
    这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点

     ==============一个人的集群配置==========

    master

    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    # see kafka.server.KafkaConfig for additional details and defaults
     
    ############################# Server Basics #############################
     
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0  #每个broker id
     
    ############################# Socket Server Settings #############################
     
    listeners=PLAINTEXT://:9092
     
    # The port the socket server listens on
    port=9092
     
    # Hostname the broker will bind to. If not set, the server will bind to all interfaces
    host.name=master
     
    # Hostname the broker will advertise to producers and consumers. If not set, it uses the
    # value for "host.name" if configured.  Otherwise, it will use the value returned from
    # java.net.InetAddress.getCanonicalHostName().
    advertised.host.name=master
     
    # The port to publish to ZooKeeper for clients to use. If this is not set,
    # it will publish the same port that the broker binds to.
    #advertised.port=<port accessible by clients>
     
    # The number of threads handling network requests
    num.network.threads=3
     
    # The number of threads doing disk I/O
    num.io.threads=8
     
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400#cache的大小,存储這么多就开始发送
     
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400#1m
     
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600#请求信息的最大数,不能超过java堆栈的大小
     
     
    ############################# Log Basics #############################
     
    # A comma seperated list of directories under which to store log files
    log.dirs=/usr/local/kafka/logs#可<span style="font-family: Arial, Helvetica, sans-serif;">多个,中间</span><span style="font-family: Arial, Helvetica, sans-serif;">用逗号分开,新建的topic存储的时候是看那个更少就存那个</span>
     
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=2
     
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
     
    ############################# Log Flush Policy #############################
     
    # Messages are immediately written to the filesystem but by default we only fsync() to sync
    # the OS cache lazily. The following configurations control the flush of data to disk.
    # There are a few important trade-offs here:
    #    1. Durability: Unflushed data may be lost if you are not using replication.
    #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
    # The settings below allow one to configure the flush policy to flush data after a period of time or
    # every N messages (or both). This can be done globally and overridden on a per-topic basis.
     
    # The number of messages to accept before forcing a flush of data to disk
    #log.flush.interval.messages=10000
     
    # The maximum amount of time a message can sit in a log before we force a flush
    #log.flush.interval.ms=1000
     
    ############################# Log Retention Policy #############################
     
    # The following configurations control the disposal of log segments. The policy can
    # be set to delete segments after a period of time, or after a given size has accumulated.
    # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    # from the end of the log.
     
    # The minimum age of a log file to be eligible for deletion
    log.retention.hours=168#保存7天
     
    # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
    # segments don't drop below log.retention.bytes.
    #log.retention.bytes=1073741824#默认消息不可以超出的大小
     
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824#消息持久化文件的最大化大小。超过会新起一个
     
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000#多长时间检查一次连接情况
     
    ############################# Zookeeper #############################
     
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=master:2181,worker1:2181,worker2:2181#zookeeper的接口
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000

    work1

    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    # see kafka.server.KafkaConfig for additional details and defaults
     
    ############################# Server Basics #############################
     
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=1
     
    ############################# Socket Server Settings #############################
     
    listeners=PLAINTEXT://:9092
     
    # The port the socket server listens on
    port=9092
     
    # Hostname the broker will bind to. If not set, the server will bind to all interfaces
    host.name=worker1
     
    # Hostname the broker will advertise to producers and consumers. If not set, it uses the
    # value for "host.name" if configured.  Otherwise, it will use the value returned from
    # java.net.InetAddress.getCanonicalHostName().
    advertised.host.name=worker1
     
    # The port to publish to ZooKeeper for clients to use. If this is not set,
    # it will publish the same port that the broker binds to.
    #advertised.port=<port accessible by clients>
     
    # The number of threads handling network requests
    num.network.threads=3
     
    # The number of threads doing disk I/O
    num.io.threads=8
     
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
     
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
     
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
     
     
    ############################# Log Basics #############################
     
    # A comma seperated list of directories under which to store log files
    log.dirs=/usr/local/kafka/logs
     
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=2
     
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
     
    ############################# Log Flush Policy #############################
     
    # Messages are immediately written to the filesystem but by default we only fsync() to sync
    # the OS cache lazily. The following configurations control the flush of data to disk.
    # There are a few important trade-offs here:
    #    1. Durability: Unflushed data may be lost if you are not using replication.
    #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
    # The settings below allow one to configure the flush policy to flush data after a period of time or
    # every N messages (or both). This can be done globally and overridden on a per-topic basis.
     
    # The number of messages to accept before forcing a flush of data to disk
    #log.flush.interval.messages=10000
     
    # The maximum amount of time a message can sit in a log before we force a flush
    #log.flush.interval.ms=1000
     
    ############################# Log Retention Policy #############################
     
    # The following configurations control the disposal of log segments. The policy can
    # be set to delete segments after a period of time, or after a given size has accumulated.
    # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    # from the end of the log.
     
    # The minimum age of a log file to be eligible for deletion
    log.retention.hours=168
     
    # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
    # segments don't drop below log.retention.bytes.
    #log.retention.bytes=1073741824
     
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
     
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
     
    ############################# Zookeeper #############################
     
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=master:2181,worker1:2181,worker2:2181
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000

    work2

    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    # see kafka.server.KafkaConfig for additional details and defaults
     
    ############################# Server Basics #############################
     
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=2
     
    ############################# Socket Server Settings #############################
     
    listeners=PLAINTEXT://:9092
     
    # The port the socket server listens on
    port=9092
     
    # Hostname the broker will bind to. If not set, the server will bind to all interfaces
    host.name=worker2
     
    # Hostname the broker will advertise to producers and consumers. If not set, it uses the
    # value for "host.name" if configured.  Otherwise, it will use the value returned from
    # java.net.InetAddress.getCanonicalHostName().
    advertised.host.name=worker2
     
    # The port to publish to ZooKeeper for clients to use. If this is not set,
    # it will publish the same port that the broker binds to.
    #advertised.port=<port accessible by clients>
     
    # The number of threads handling network requests
    num.network.threads=3
     
    # The number of threads doing disk I/O
    num.io.threads=8
     
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
     
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
     
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
     
     
    ############################# Log Basics #############################
     
    # A comma seperated list of directories under which to store log files
    log.dirs=/usr/local/kafka/logs
     
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=2
     
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
     
    ############################# Log Flush Policy #############################
     
    # Messages are immediately written to the filesystem but by default we only fsync() to sync
    # the OS cache lazily. The following configurations control the flush of data to disk.
    # There are a few important trade-offs here:
    #    1. Durability: Unflushed data may be lost if you are not using replication.
    #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
    # The settings below allow one to configure the flush policy to flush data after a period of time or
    # every N messages (or both). This can be done globally and overridden on a per-topic basis.
     
    # The number of messages to accept before forcing a flush of data to disk
    #log.flush.interval.messages=10000
     
    # The maximum amount of time a message can sit in a log before we force a flush
    #log.flush.interval.ms=1000
     
    ############################# Log Retention Policy #############################
     
    # The following configurations control the disposal of log segments. The policy can
    # be set to delete segments after a period of time, or after a given size has accumulated.
    # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    # from the end of the log.
     
    # The minimum age of a log file to be eligible for deletion
    log.retention.hours=168
     
    # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
    # segments don't drop below log.retention.bytes.
    #log.retention.bytes=1073741824
     
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
     
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
     
    ############################# Zookeeper #############################
     
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=master:2181,worker1:2181,worker2:2181
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000

    ==============

  • 相关阅读:
    SAP S/4HANA extensibility扩展原理介绍
    SAP CRM系统订单模型的设计与实现
    使用nodejs代码在SAP C4C里创建Individual customer
    SAP Cloud for Customer Account和individual customer的区别
    Let the Balloon Rise map一个数组
    How Many Tables 简单并查集
    Heap Operations 优先队列
    Arpa’s obvious problem and Mehrdad’s terrible solution 思维
    Passing the Message 单调栈两次
    The Suspects 并查集
  • 原文地址:https://www.cnblogs.com/testzcy/p/7800298.html
Copyright © 2011-2022 走看看