zoukankan      html  css  js  c++  java
  • java 架构之路(队列)kafka

    什么是kafka

    kafka是一种高吞吐量的分布式发布订阅消息系统
    主要是三个功能

    1. 发布和订阅记录的流,类似于消息队列或者企业级消息系统。
    2. 以容错的、持久的方式存储记录流
    3. 当发生时处理记录流。
    复制代码

    SpringBoot集成kafka

    新建SpringBoot项目

    引入kafka

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    复制代码

    yml 文件中添加配置

    spring:
      kafka:
        # Kafka集群
        bootstrap-servers: xxxxxxx
    复制代码

    Hello Kafka

    生产者

    public class KafkaProducer {
        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
        public void sendMessage(String normalMessage) {
            kafkaTemplate.send("topic", normalMessage);
        }
    }
    复制代码

    消费者

    @Component
    public class KafkaConsumer {
        // 消费监听
        @KafkaListener(topics = {"topic"})
        public void onMessage1(ConsumerRecord<?, ?> record){
            // 消费的哪个topic、partition的消息,打印出消息内容
            System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
        }
    }
    复制代码

    生产者

    1、带回调的生产者

        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
        public void sendMessage(String normalMessage) {
            ListenableFuture<SendResult<String, String>> future = kafkaTemplate
                        .send("topic", normalMessage);
            future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                @Override
                public void onSuccess(SendResult<String, String> result) {
                    onSendSuccess(result, k, v);
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                        + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
                }
            });  
        }
    复制代码

    消费者

    
        public String getPubSubTopic() {
            return "topic";
        }
    
        public String getSubscriber() {
            return "group";
        }
        @SuppressWarnings({ "rawtypes", "unchecked" })
        @KafkaListener(groupId = "#{__listener.subscriber}", topics = "#{__listener.pubSubTopic.split(',')}", containerFactory = "batchFactory")
        public void listen(ConsumerRecord<String, String> records) {
        
        }
    复制代码

    属性解释:

    ① id:消费者ID;

    ② groupId:消费组ID;

    ③ topics:监听的topic,可监听多个;

    ④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。

    批量消费者

    # 设置批量消费
    spring.kafka.listener.type=batch
    # 批量消费每次最多消费多少条消息
    spring.kafka.consumer.max-poll-records=50
    复制代码
    public void onMessage(List<ConsumerRecord<?, ?>> records) {}
    复制代码
     
  • 相关阅读:
    vb.net FTP上传下载,目录操作
    vb.net导出CSV文件
    服务器内存总量
    定义数组
    监控键盘健代码
    C# FTp 上传,下载
    使用EasyUI中Tree
    微信web开发自定义分享
    mysql将时间戳格式化
    查询表时给字段赋默认值 sql
  • 原文地址:https://www.cnblogs.com/ming569/p/13693204.html
Copyright © 2011-2022 走看看