zoukankan      html  css  js  c++  java
  • spring boot+kafka整合

    springboot版本是2.0.4

     首先,在maven中引入spring-kafka的jar包

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency> 

    在application-dev.properties配置生产者

    #=============== producer  =======================
    spring.kafka.producer.bootstrap-servers=127.0.0.1:9092,127.0.0.1:9093
    spring.kafka.producer.retries=1
    spring.kafka.producer.batch-size=16384
    spring.kafka.producer.buffer-memory=33554432
    spring.kafka.producer.properties.max.requst.size=2097152
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

    生产者向kafka发送消息

    @Component
    @Slf4j
    public class KafkaSender {
    
        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
    
        // 发送消息方法
        public void send(String body) {
    
            kafkaTemplate.send("testTopic", body);
            log.info("发送消息完成,内容 为:" + body);
        }
    
    }

    配置消费者

    #=============== consumer  =======================
    spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092,127.0.0.1:9093
    spring.kafka.consumer.group-id=0
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.enable-auto-commit=true
    spring.kafka.consumer.auto-commit-interval=100
    #=======set comsumer max fetch.byte 2*1024*1024=============
    spring.kafka.consumer.properties.max.partition.fetch.bytes=2097152

    消费者监听topic=testTopic的消息

    @KafkaListener(topics = { "testTopic" })
    public void listen(ConsumerRecord<?, ?> record) {
      log.info("接收消息为:" + record.value());
    }

    在整合过程中,spring boot帮我们把kafka的大部分属性直接带出来了,但是有些不常用的属性,需要通过

    spring.kafka.consumer.properties.*

    来设置,例如max.partition.fetch.bytes,一次fetch请求,从一个partition中取得的records最大值.

    在application.properties中添加kafka扩展属性,

    #设置一次fetch记录的最大值2M(2*1024*1024),默认值为1M
    spring.kafka.consumer.properties.max.partition.fetch.bytes=2097152

    更多配置请参考kafka属性大全

     1 # APACHE KAFKA (KafkaProperties)
     2 spring.kafka.admin.client-id= # ID to pass to the server when making requests. Used for server-side logging.
     3 spring.kafka.admin.fail-fast=false # Whether to fail fast if the broker is not available on startup.
     4 spring.kafka.admin.properties.*= # Additional admin-specific properties used to configure the client.
     5 spring.kafka.admin.ssl.key-password= # Password of the private key in the key store file.
     6 spring.kafka.admin.ssl.keystore-location= # Location of the key store file.
     7 spring.kafka.admin.ssl.keystore-password= # Store password for the key store file.
     8 spring.kafka.admin.ssl.keystore-type= # Type of the key store.
     9 spring.kafka.admin.ssl.protocol= # SSL protocol to use.
    10 spring.kafka.admin.ssl.truststore-location= # Location of the trust store file.
    11 spring.kafka.admin.ssl.truststore-password= # Store password for the trust store file.
    12 spring.kafka.admin.ssl.truststore-type= # Type of the trust store.
    13 spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
    14 spring.kafka.client-id= # ID to pass to the server when making requests. Used for server-side logging.
    15 spring.kafka.consumer.auto-commit-interval= # Frequency with which the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' is set to true.
    16 spring.kafka.consumer.auto-offset-reset= # What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server.
    17 spring.kafka.consumer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
    18 spring.kafka.consumer.client-id= # ID to pass to the server when making requests. Used for server-side logging.
    19 spring.kafka.consumer.enable-auto-commit= # Whether the consumer's offset is periodically committed in the background.
    20 spring.kafka.consumer.fetch-max-wait= # Maximum amount of time the server blocks before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by "fetch.min.bytes".
    21 spring.kafka.consumer.fetch-min-size= # Minimum amount of data, in bytes, the server should return for a fetch request.
    22 spring.kafka.consumer.group-id= # Unique string that identifies the consumer group to which this consumer belongs.
    23 spring.kafka.consumer.heartbeat-interval= # Expected time between heartbeats to the consumer coordinator.
    24 spring.kafka.consumer.key-deserializer= # Deserializer class for keys.
    25 spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll().
    26 spring.kafka.consumer.properties.*= # Additional consumer-specific properties used to configure the client.
    27 spring.kafka.consumer.ssl.key-password= # Password of the private key in the key store file.
    28 spring.kafka.consumer.ssl.keystore-location= # Location of the key store file.
    29 spring.kafka.consumer.ssl.keystore-password= # Store password for the key store file.
    30 spring.kafka.consumer.ssl.keystore-type= # Type of the key store.
    31 spring.kafka.consumer.ssl.protocol= # SSL protocol to use.
    32 spring.kafka.consumer.ssl.truststore-location= # Location of the trust store file.
    33 spring.kafka.consumer.ssl.truststore-password= # Store password for the trust store file.
    34 spring.kafka.consumer.ssl.truststore-type= # Type of the trust store.
    35 spring.kafka.consumer.value-deserializer= # Deserializer class for values.
    36 spring.kafka.jaas.control-flag=required # Control flag for login configuration.
    37 spring.kafka.jaas.enabled=false # Whether to enable JAAS configuration.
    38 spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule # Login module.
    39 spring.kafka.jaas.options= # Additional JAAS options.
    40 spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
    41 spring.kafka.listener.ack-mode= # Listener AckMode. See the spring-kafka documentation.
    42 spring.kafka.listener.ack-time= # Time between offset commits when ackMode is "TIME" or "COUNT_TIME".
    43 spring.kafka.listener.client-id= # Prefix for the listener's consumer client.id property.
    44 spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
    45 spring.kafka.listener.idle-event-interval= # Time between publishing idle consumer events (no data received).
    46 spring.kafka.listener.log-container-config= # Whether to log the container configuration during initialization (INFO level).
    47 spring.kafka.listener.monitor-interval= # Time between checks for non-responsive consumers. If a duration suffix is not specified, seconds will be used.
    48 spring.kafka.listener.no-poll-threshold= # Multiplier applied to "pollTimeout" to determine if a consumer is non-responsive.
    49 spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer.
    50 spring.kafka.listener.type=single # Listener type.
    51 spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete.
    52 spring.kafka.producer.batch-size= # Default batch size in bytes.
    53 spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
    54 spring.kafka.producer.buffer-memory= # Total bytes of memory the producer can use to buffer records waiting to be sent to the server.
    55 spring.kafka.producer.client-id= # ID to pass to the server when making requests. Used for server-side logging.
    56 spring.kafka.producer.compression-type= # Compression type for all data generated by the producer.
    57 spring.kafka.producer.key-serializer= # Serializer class for keys.
    58 spring.kafka.producer.properties.*= # Additional producer-specific properties used to configure the client.
    59 spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends.
    60 spring.kafka.producer.ssl.key-password= # Password of the private key in the key store file.
    61 spring.kafka.producer.ssl.keystore-location= # Location of the key store file.
    62 spring.kafka.producer.ssl.keystore-password= # Store password for the key store file.
    63 spring.kafka.producer.ssl.keystore-type= # Type of the key store.
    64 spring.kafka.producer.ssl.protocol= # SSL protocol to use.
    65 spring.kafka.producer.ssl.truststore-location= # Location of the trust store file.
    66 spring.kafka.producer.ssl.truststore-password= # Store password for the trust store file.
    67 spring.kafka.producer.ssl.truststore-type= # Type of the trust store.
    68 spring.kafka.producer.transaction-id-prefix= # When non empty, enables transaction support for producer.
    69 spring.kafka.producer.value-serializer= # Serializer class for values.
    70 spring.kafka.properties.*= # Additional properties, common to producers and consumers, used to configure the client.
    71 spring.kafka.ssl.key-password= # Password of the private key in the key store file.
    72 spring.kafka.ssl.keystore-location= # Location of the key store file.
    73 spring.kafka.ssl.keystore-password= # Store password for the key store file.
    74 spring.kafka.ssl.keystore-type= # Type of the key store.
    75 spring.kafka.ssl.protocol= # SSL protocol to use.
    76 spring.kafka.ssl.truststore-location= # Location of the trust store file.
    77 spring.kafka.ssl.truststore-password= # Store password for the trust store file.
    78 spring.kafka.ssl.truststore-type= # Type of the trust store.
    79 spring.kafka.template.default-topic= # Default topic to which messages are sent.

    官方文档地址:

    https://docs.spring.io/spring-boot/docs/2.0.4.RELEASE/reference/htmlsingle/#common-application-properties

    记录点滴,沉淀自己,汇聚成海,重新再出发
  • 相关阅读:
    ubuntu12.04 死机 卡屏 画面冻结解决方案
    Install Firefox 20 in Ubuntu 13.04, Ubuntu 12.10, Ubuntu 12.04, Linux Mint 14 and Linux Mint 13 by PPA
    ListView1.SelectedItems.Clear()
    android studio 下载地址
    jquery.slider.js jquery幻灯片测试
    jquery.hovermenu.js
    jquery.tab.js选项卡效果
    适配 placeholder,jquery版
    jquery.autoscroll.js jquery自动滚动效果
    将 Google Earth 地图集成到自己的窗体上的 简单控件
  • 原文地址:https://www.cnblogs.com/lixyu/p/9404144.html
Copyright © 2011-2022 走看看