zoukankan      html  css  js  c++  java
  • [Kafka]Kafka基础

    Kafka

    Kafka介绍

    kafka最初是由linkedin开发的,是一个分布式,分区的,多副本的,基于Zookeeper协调的分布式日志系统,当然它也可以当做消息队列来使用。
    常见的可以用于Web,nginx日志,访问日志,消息服务等等。
    所以kafka的应用场景主要有:日志收集系统和消息系统。

    特点

    1,解耦

    消费者生产者之间不想相互耦合,只要都遵循同样的接口约束就行。

    2,冗余(副本)

    这里主要是为了保证数据不会丢失,许多消息队列采用"插入-获取-删除"的模式,在把一个消息从队列中年删除之前,需要系统明确指出这个消息已经被处理完毕,从而确保数据被安全地保存直到使用完毕。

    3,扩展性

    支持扩展

    4,灵活性,峰值处理能力

    在访问量剧增的情况下,使用消息队列能够使得关键组件顶住突然的访问压力,使得应用仍然需要继续发挥作用。

    5,可恢复性

    系统的一部分组件失效时,不会影响整个系统,即使一个处理消息的线程挂掉,加入队列中的消息也可以在系统恢复后被处理。

    6,顺序保证

    Kafka保证一个Partition中的消息的有序性。

    7,缓冲

    通过一个缓冲层来帮助任务最高效率地执行,写入队列的处理尽可能地传递。

    8,异步通信

    采用异步通信机制,允许先把消息放入队列,但并不立即处理,而是在需要的时候再去用它们。

    Kafka中的几个概念

    avatar

    1,Broker

    Kafka集群包括一个或者多个服务器,服务器节点称为broker。broker存储topic的数据,如果某个topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition,如果某个topic有N个partition,集群有N+m个broker,那么N个broker存储该topic中的一个partition,剩下的m个broker不存储该topic的partition数据。如果某个topic的broker数量比partition的数量少,那么一个broker可能会存储多个该topic的partition。
    在实际生产中应该尽量避免这种情况发生,因为很容易造成kafka集群数据不均衡。

    2,Topic

    每条发布到kafka的集群消息都有一个类别,这个类别称为topic。

    3,Partition

    Topic的数据分割为一个或者多个partition,每个partition中的数据使用过个segment文件存储。partition的数据是有序的,不同partition间的数据丢失了数据的顺序,如果topic有多个partition,消费数据就不能保证数据的顺序,在需要严格保证消息的消息顺序的场景下,需要将partition数目需要1。

    4,Producer

    生产者

    5,Consumer

    消费者

    6,Consumer Group

    每个Consumer属于一个特定的ComsumerGroup,可为每个Consumer指定GroupName,不指定则为默认。

    7,Leader

    每个Partition有多个副本,其中有且仅有一个Leader,即负责读写数据的Partition。

    8,Follower

    Follower跟随Leader,所有的写请求都通过Leader路由,数据变更会广播到所有的Follower。如果Leader失效,那么Follower中会选举出一个新的Leader。

    入门demo

    本想继续写一写kafka的架构,高可用设计和其中的一些特性的,但是我这两天在看这些东西的时候发现这些还是在一个demo的基础上再去学习比较好,所以这些留在下一篇写了。

    前面的准备

    安装kafka和Zookeeper,kafka运行需要Zookeeper来支持,来进行心跳等机制,所以在运行kafka之前安装好Zookeeper。网上帖子很多,就不细写了,但是我这里Zookeeper和kafka都是单实例的,并没有配置集群。

    建工程

    IDEA用SpringInitializer建立一个大工程,然后建立KafkaConsumer和KafkaProducer两个module就行了。

    引入依赖

    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.16.18</version>
            </dependency>
        </dependencies>
    

    配置

    生产者

    server.port=8099
    
    # kafka地址
    spring.kafka.bootstrap-servers=127.0.0.1:9092
    #写入失败的时候的重试次数
    spring.kafka.producer.retries=0
    
    # 每次批量发送消息的数量
    spring.kafka.producer.batch-size=16384
    # producer积累数据一次性发送,缓存大小到达这个值就发送数据
    spring.kafka.producer.buffer-memory=33554432
    
    #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
    #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
    #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
    spring.kafka.producer.acks=1
    
    # 指定消息key和消息体的编解码方式
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    

    消费者

    server.port=8090
    
    # kafka地址
    spring.kafka.bootstrap-servers=127.0.0.1:9092
    # 自动提交的时间间隔
    spring.kafka.consumer.auto-commit-interval=1S
    # 指定消费者在读取一个没有偏移量的分区或者偏移量无效的分区的情况下如何处理。
    # latest在偏移量无效的情况下,消费者将从最新的记录开始读取数据
    # earliest在偏移量无效的情况下,消费者将从起始位置读取分区的记录
    spring.kafka.consumer.auto-offset-reset=earliest
    # 是否自动提交偏移量,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
    spring.kafka.consumer.enable-auto-commit=false
    # key的反序列化方式
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    
    spring.kafka.listener.concurrency=5
    spring.kafka.listener.ack-mode=manual_immediate
    spring.kafka.listener.missing-topics-fatal=false
    

    我这里采用的就是简单的StringSerializer和StringDeserializer,如果是传递对象,有两种方式,一种是自定义解码和编码器,需要实现Serializer接口,另一种就是用已有的格式来解码和编码,比如json格式来传递信息,然后用fastjson等框架来解码和编码。
    另外一点就是消费者的监听器必须要设置ack-mode,因为上面设置的自动提交的选项设置为了false,所以需要手动设置提交offset的模式。

    生产者实现

    @Component
    @Slf4j
    public class KafkaProducer {
    
        @Autowired
        private KafkaTemplate<String,Object> kafkaTemplate;
    
        public void send(Object o){
    
            String objStr = JSONObject.toJSONString((o));
            log.info("sending info:"+objStr);
            ListenableFuture<SendResult<String,Object>> future=
                    kafkaTemplate.send("test-topic-1",o);
    
            future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                @Override
                public void onFailure(Throwable throwable) {
                    log.info("test-topic-1发送失败,"+throwable.getMessage());
                }
    
                @Override
                public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                    log.info("test-topic-1发送成功,"+stringObjectSendResult.toString());
                }
            });
        }
    
    }
    

    然后简单写一个Controller来触发消息的发送。

    @RestController
    public class KafkaController {
    
    
        @Autowired
        private KafkaProducer kafkaProducer;
    
        @GetMapping("/message/send")
        public boolean send(){
            kafkaProducer.send("this is a test message");
            return true;
        }
    
    
    }
    
    

    生产者实现

    @Component
    @Slf4j
    public class KafkaConsumer {
    
        @KafkaListener(topics = "test-topic-1",groupId = "test-group-1")
        public void topic_test(ConsumerRecord<?,?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC)String topic){
    
            Optional message = Optional.ofNullable(record.value());
    
            if(message.isPresent()){
                Object msg = message.get();
                log.info("消费了: topic:"+topic+",message:"+msg);
                ack.acknowledge();
    
            }
        }
    
        @KafkaListener(topics = "test-topic-1",groupId = "test-group-2")
        public void topic_test_1(ConsumerRecord<?,?>record,Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC)String topic){
            Optional message = Optional.ofNullable(record.value());
            if (message.isPresent()) {
                Object msg = message.get();
                log.info("消费了: topic:"+topic+",message:"+msg);
                ack.acknowledge();
            }
    
        }
    }
    

    启动测试

    在启动这两个模块之前,需要确认kafka和Zookeeper都已经启动。
    启动生产者,控制台有如下信息:

    2020-09-13 21:53:10.892  INFO 17928 --- [nio-8099-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.1
    2020-09-13 21:53:10.894  INFO 17928 --- [nio-8099-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 0efa8fb0f4c73d92
    2020-09-13 21:53:10.894  INFO 17928 --- [nio-8099-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1600005190890
    2020-09-13 21:53:11.125  INFO 17928 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: OtDSNkOFT4eFbSso_V8qAQ
    2020-09-13 21:53:11.167  INFO 17928 --- [ad | producer-1] c.e.k.producer.KafkaProducer             : test-topic-1发送成功,SendResult [producerRecord=ProducerRecord(topic=test-topic-1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=this is a test message, timestamp=null), recordMetadata=test-topic-1-0@4]
    2020-09-13 21:55:34.570  INFO 17928 --- [nio-8099-exec-3] c.e.k.producer.KafkaProducer             : sending info:"this is a test message"
    2020-09-13 21:55:34.579  INFO 17928 --- [ad | producer-1] c.e.k.producer.KafkaProducer             : test-topic-1发送成功,SendResult [producerRecord=ProducerRecord(topic=test-topic-1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=this is a test message, timestamp=null), recordMetadata=test-topic-1-0@5]
    
    

    启动消费者,可以看到控制台打印了发过来的信息

    2020-09-13 21:55:24.077  INFO 13296 --- [ntainer#1-4-C-1] o.s.k.l.KafkaMessageListenerContainer    : test-group-2: partitions assigned: [test-topic-1-0]
    2020-09-13 21:55:24.077  INFO 13296 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test-group-1: partitions assigned: [test-topic-1-0]
    2020-09-13 21:55:24.114  INFO 13296 --- [ntainer#0-0-C-1] c.e.k.consumer.KafkaConsumer             : 消费了: topic:test-topic-1,message:this is a test message
    2020-09-13 21:55:24.114  INFO 13296 --- [ntainer#1-4-C-1] c.e.k.consumer.KafkaConsumer             : topic_test1 消费了: Topic:test-topic-1,Message:this is a test message
    2020-09-13 21:55:34.579  INFO 13296 --- [ntainer#0-0-C-1] c.e.k.consumer.KafkaConsumer             : 消费了: topic:test-topic-1,message:this is a test message
    2020-09-13 21:55:34.580  INFO 13296 --- [ntainer#1-4-C-1] c.e.k.consumer.KafkaConsumer             : topic_test1 消费了: Topic:test-topic-1,Message:this is a test message
    
    

    参考资料

    Kafka

  • 相关阅读:
    mybatis中的缓存
    mybatis中的延迟加载
    mybatis中的ResultMap关联映射
    mubatis中为什么实体类要继承Serializable
    【经验总结-markdown】markdown字体和颜色设置
    【算法】动态规划
    【刷题-PAT】A1135 Is It A Red-Black Tree (30 分)
    【刷题-PAT】A1126 Eulerian Path (25 分)
    【刷题-PAT】A1119 Pre- and Post-order Traversals (30 分)
    【刷题-PAT】A1114 Family Property (25 分)
  • 原文地址:https://www.cnblogs.com/Yintianhao/p/13663879.html
Copyright © 2011-2022 走看看