zoukankan      html  css  js  c++  java
  • Kafka入门(安装及使用)

    Kafka是一种分布式的,基于发布/订阅的消息系统。

    Kafka的组成包括:

    • Kafka将消息以topic为单位进行归纳。
    • 将向Kafka topic发布消息的程序成为producers.
    • 将预订topics并消费消息的程序成为consumer.
    • Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker.

    Kafka的下载:https://kafka.apache.org/downloads      下载最新版本

    解压后修改配置(config/server.properties:broker.id、log.dirs)

    vim config/server.properties

    broker.id = 1

    log.dirs = "日志目录地址"

    启动服务:

    • Kafka用到了Zookeeper,所有首先启动Zookper,下面简单的启用一个单实例的Zookkeeper服务。可以在命令的结尾加个&符号,这样就可以启动后离开控制台。
    bin/zookeeper-server-start.sh config/zookeeper.properties &
    • 现在启动Kafka
    bin/kafka-server-start.sh config/server.properties
    • 创建topic
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    •  发送消息
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
    • 启动consumer
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

    Springboot中整合Kafka:

      pom文件中:

     

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.2.0.RELEASE</version>
    </dependency>
     
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
    

      application.yml中:

    spring:
      # KAFKA
      kafka:
        # ָkafka服务器地址,可以指定多个
        bootstrap-servers: 123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094
        #=============== producer生产者配置 =======================
        producer:
          retries: 0
          # 每次批量发送消息的数量
          batch-size: 16384
          # 缓存容量
          buffer-memory: 33554432
          # ָ指定消息key和消息体的编解码方式
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        #=============== consumer消费者配置  =======================
        consumer:
          #指定默认消费者的group id
          group-id: test-app
          #earliest
          #当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
          #latest
          #当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
          #none
          #topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
          auto-offset-reset: latest
          enable-auto-commit: true
          auto-commit-interval: 100ms
          #指定消费key和消息体的编解码方式
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    

      定义一个生产者类:KafkaSender 负责消息推送:

    @Component
     
    public class KafkaSender {
     
        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
        private final Logger logger = LoggerFactory.getLogger(KafkaSender.class);
     
     
        public void send(String topic, String taskid, String jsonStr) {
     
     
            //发送消息
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, taskid, jsonStr);
            future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                @Override
                //推送成功
                public void onSuccess(SendResult<String, Object> result) {
                    logger.info(topic + " 生产者 发送消息成功:" + result.toString());
     
     
                }
     
                @Override
                //推送失败
                public void onFailure(Throwable ex) {
                    logger.info(topic + " 生产者 发送消息失败:" + ex.getMessage());
     
     
                }
            });
     
     
        }
     
     
    }
    

      定义一个消费者类:KafkaCustomer 用来接收消息

    @Component
    public class KafkaConsumer  {
     
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
        
    //下面的主题是一个数组,可以同时订阅多主题,只需按数组格式即可,也就是用“,”隔开
        @KafkaListener(topics = {"testTopic"})
        public void receive(ConsumerRecord<?, ?> record){
     
            logger.info("消费得到的消息---key: " + record.key());
            logger.info("消费得到的消息---value: " + record.value().toString());
        }
     
    }
    

      

  • 相关阅读:
    解密时遇到 填充无效 无法被移除
    固态硬盘SSD,机械硬盘HDD,4K速度对比。
    onsubmit ajax return false 无效
    chrome flash
    ubuntu base make 未找到命令
    winrar 压缩命令
    查看耗时长,CPU 100% 的SQL
    【转】SQL Server日志文件过大 大日志文件清理方法 不分离数据库
    安装老版本redis .NET 客户端
    python2.0_day22_web聊天室二
  • 原文地址:https://www.cnblogs.com/dinghaoran/p/11403953.html
Copyright © 2011-2022 走看看