zoukankan      html  css  js  c++  java
  • java 架构之路(队列)kafka

    什么是kafka

    kafka是一种高吞吐量的分布式发布订阅消息系统
    主要是三个功能

    1. 发布和订阅记录的流,类似于消息队列或者企业级消息系统。
    2. 以容错的、持久的方式存储记录流
    3. 当发生时处理记录流。
    复制代码

    SpringBoot集成kafka

    新建SpringBoot项目

    引入kafka

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    复制代码

    yml 文件中添加配置

    spring:
      kafka:
        # Kafka集群
        bootstrap-servers: xxxxxxx
    复制代码

    Hello Kafka

    生产者

    public class KafkaProducer {
        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
        public void sendMessage(String normalMessage) {
            kafkaTemplate.send("topic", normalMessage);
        }
    }
    复制代码

    消费者

    @Component
    public class KafkaConsumer {
        // 消费监听
        @KafkaListener(topics = {"topic"})
        public void onMessage1(ConsumerRecord<?, ?> record){
            // 消费的哪个topic、partition的消息,打印出消息内容
            System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
        }
    }
    复制代码

    生产者

    1、带回调的生产者

        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
        public void sendMessage(String normalMessage) {
            ListenableFuture<SendResult<String, String>> future = kafkaTemplate
                        .send("topic", normalMessage);
            future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                @Override
                public void onSuccess(SendResult<String, String> result) {
                    onSendSuccess(result, k, v);
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                        + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
                }
            });  
        }
    复制代码

    消费者

    
        public String getPubSubTopic() {
            return "topic";
        }
    
        public String getSubscriber() {
            return "group";
        }
        @SuppressWarnings({ "rawtypes", "unchecked" })
        @KafkaListener(groupId = "#{__listener.subscriber}", topics = "#{__listener.pubSubTopic.split(',')}", containerFactory = "batchFactory")
        public void listen(ConsumerRecord<String, String> records) {
        
        }
    复制代码

    属性解释:

    ① id:消费者ID;

    ② groupId:消费组ID;

    ③ topics:监听的topic,可监听多个;

    ④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。

    批量消费者

    # 设置批量消费
    spring.kafka.listener.type=batch
    # 批量消费每次最多消费多少条消息
    spring.kafka.consumer.max-poll-records=50
    复制代码
    public void onMessage(List<ConsumerRecord<?, ?>> records) {}
    复制代码
     
  • 相关阅读:
    css的三种特性
    css选择器
    margin:0 auto 与 text-align:center 的区别
    JS如何实现点击页面内任意的链接均加参数跳转
    css和js带参数(形如.css?v=与.js?v= 或 .css?version=与.js?version=)
    移动端页面前端设计随笔整理
    理解:Before和:After伪元素
    时下流行的css3页面纵向滑动效果
    webapp网页调试工具Chrome Devtools
    做手机web半年遇到的问题及解决方法
  • 原文地址:https://www.cnblogs.com/ming569/p/13693204.html
Copyright © 2011-2022 走看看