zoukankan      html  css  js  c++  java
  • springboot+Kafka(生产者和消费者)

    1、配置信息

    spring.application.name=kafka-producer
    server.port=8091
    
    spring.kafka.producer.bootstrapServers=192.168.229.13:9092,192.168.229.13:9093,192.168.229.13:9094
    
    #安全认证
    #spring.kafka.producer.ssl=
    
    # procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
    # acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
    # acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
    # acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
    spring.kafka.producer.acks=all
    
    # 每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,
    # 这有助于提升客户端和服务端之间的性能,此配置控制默认批量大小(以字节为单位),默认值为16384
    spring.kafka.producer.batchSize=
    
    #缓存数据内存大小
    spring.kafka.producer.bufferMemory=
    
    #server记录日志
    spring.kafka.producer.clientId=kafka-producer
    
    #producer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好
    spring.kafka.producer.compressionType=gzip
    
    #key反序列化类,实现org.apache.kafka.common.serialization.Deserializer接口
    spring.kafka.producer.keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer
    #value反序列化类,实现org.apache.kafka.common.serialization.Deserializer接口
    spring.kafka.producer.valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer
    
    #客户端发送重试
    spring.kafka.producer.retries=0
    
    #生产者事务
    spring.kafka.producer.transactionIdPrefix=
    
    #其他配置
    #spring.kafka.producer.properties=
    spring.application.name=kafka-consumer1
    server.port=8092
    
    spring.kafka.consumer.bootstrapServers=192.168.229.13:9092,192.168.229.13:9093,192.168.229.13:9094
    
    
    #spring.kafka.consumer.clientId="kafka-consumer"
    spring.kafka.consumer.group-id=myGroup1
    
    #自动提交
    spring.kafka.consumer.enableAutoCommit=true
    
    #自动向zookeeper提交offset的频率,默认:5000
    spring.kafka.consumer.autoCommitInterval=10
    
    #0:READ_UNCOMMITTED, 1:READ_COMMITTED;
    spring.kafka.consumer.isolationLevel=READ_COMMITTED
    
    # 没有初始化的offset时,可以设置以下三种情况:(默认:latest)
    # earliest
    # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    # latest
    # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    # none
    # topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
    spring.kafka.consumer.autoOffsetReset=latest
    
    #安全认证
    #spring.kafka.consumer.ssl=
    
    #Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。
    #这个配置就是来配置consumer最多等待response多久。
    spring.kafka.consumer.fetchMaxWait=100
    #每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。默认:1
    spring.kafka.consumer.fetchMinSize=10
    
    #消费超时时间,大小不能超过session.timeout.ms,默认:3000
    spring.kafka.consumer.heartbeatInterval=100
    
    #key反序列化类,实现org.apache.kafka.common.serialization.Deserializer接口
    spring.kafka.consumer.keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer
    #value反序列化类,实现org.apache.kafka.common.serialization.Deserializer接口
    spring.kafka.consumer.valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer
    
    #max.poll.records条数据需要在session.timeout.ms这个时间内处理完,默认:500
    spring.kafka.consumer.maxPollRecords=500
    spring.application.name=kafka-consumer2
    server.port=8093
    
    spring.kafka.consumer.bootstrapServers=192.168.229.13:9092,192.168.229.13:9093,192.168.229.13:9094
    
    
    #spring.kafka.consumer.clientId="kafka-consumer"
    spring.kafka.consumer.group-id=myGroup2
    
    #自动提交
    spring.kafka.consumer.enableAutoCommit=true
    
    #自动向zookeeper提交offset的频率,默认:5000
    spring.kafka.consumer.autoCommitInterval=10
    
    #0:READ_UNCOMMITTED, 1:READ_COMMITTED;
    spring.kafka.consumer.isolationLevel=READ_COMMITTED
    
    # 没有初始化的offset时,可以设置以下三种情况:(默认:latest)
    # earliest
    # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    # latest
    # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    # none
    # topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
    spring.kafka.consumer.autoOffsetReset=latest
    
    #安全认证
    #spring.kafka.consumer.ssl=
    
    #Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。
    #这个配置就是来配置consumer最多等待response多久。
    spring.kafka.consumer.fetchMaxWait=100
    #每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。默认:1
    spring.kafka.consumer.fetchMinSize=10
    
    #消费超时时间,大小不能超过session.timeout.ms,默认:3000
    spring.kafka.consumer.heartbeatInterval=100
    
    #key反序列化类,实现org.apache.kafka.common.serialization.Deserializer接口
    spring.kafka.consumer.keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer
    #value反序列化类,实现org.apache.kafka.common.serialization.Deserializer接口
    spring.kafka.consumer.valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer
    
    #max.poll.records条数据需要在session.timeout.ms这个时间内处理完,默认:500
    spring.kafka.consumer.maxPollRecords=500

    Git地址:https://github.com/wangymd/myKafka.git

  • 相关阅读:
    OCP-1Z0-053-V12.02-235题
    OCP-1Z0-053-V12.02-524题
    OCP-1Z0-053-V12.02-525题
    OCP-1Z0-053-V12.02-526题
    OCP-1Z0-053-V12.02-535题
    OCP-1Z0-053-V12.02-540题
    OCP-1Z0-053-V12.02-617题
    OCP-1Z0-053-V12.02-649题
    如何制作Jar包并在android中调用jar包
    JAVA实现回调
  • 原文地址:https://www.cnblogs.com/wangymd/p/13287292.html
Copyright © 2011-2022 走看看