zoukankan      html  css  js  c++  java
  • Kafka学习之路

     

    基础概念

    Kafka最初是由Linkedin公司开发,是一个分布式、支持分区的、多副本的,基于zookeeper协调的分布式消息系统,可以实时的处理大量数据以满足各种需求场景:基于hadoop的批处理系统、低延迟的实时系统、storm/spark流式处理引擎、web/nginx日志、访问日志、消息服务等等,用scala语言编写,于2010年共享给Apache基金会成为开源项目。

    Broker:Kafka集群中的服务器节点。

    Topic:发布的消息类别(主题),相当于数据库表名。

    Partition:Topic中的数据分割成一个或多个Partition,每个topic至少有一个Partition,Partition中的数据使用多个segment文件保存。Partition中数据是有序的,但是Partition之间的数据没有顺序。在严格保证消息的消费顺序场景下,需要将Partition数目设为1。

    Producer:数据发布者,发布消息到Topic中,Broker接收到生产者发送的消息后,将消息追加到当前数据的segment文件中。可以指定数据存储的Partition。

    Consumer: 消费者,从Broker中读取数据,可以消费多个topic中的数据。

    Consumer Group: 消费者群组,消费者群组是为了解决在生产者生产能力过高单一消费者无法完成消费而引进。一个群组中的所有消费者消费同一个主题,每个消费者分别消费不同的分区。如果有多个程序需要消费同一个主题,新增一个消费者群组。

    Offset:kafka的存储文件都是按照offset.kafka来命名,方便查找。另一种元数据,是一个不断递增的整数值,在创建消息时会添加到消息里。给定的分区里,每个消息的偏移量都是唯一的。消费者会把每个分区读取的消息偏移量保存在zookeeper或kafka上。

    Kafka框架

     

    Kafka集群

    Kafka集群使用zookeeper维护成员信息,如下图:

     控制器: 集群中第一个broker启动时创建临时节点/controller,让自己成为控制器。其他broker在控制节点上创建Zookeeper watch对象,监听控制器变更通知。

    复制:kafka架构的核心,每个分区都有副本,副本分为首领副本和跟随者副本。为了保持数据一致性,所有的请求都会经过首领副本。跟随副本会从首领副本复制消息,保持与首领消息一致性。首领发生崩溃,其中一个持续请求得到最新消息的副本(同步的副本),可能会成为新的首领副本。

    Kakfa文件存储:

    物理存储:kafka基本存储单元是分区,分区无法在多个broker间细分,也无法在同一个broker的多个磁盘上进行细分。分区在使用时会被分成若干个片段,默认情况下每个片段包含1GB或一周的数据(以较小为准)。达到片段上限,关闭当前文件,并打开一个新文件。当前写入数据的片段叫作活跃片段。

    KafKa生成者和消费者

    生产者demo

        private static KafkaProducer producer = null;
    
        static {
    
            Properties props = new Properties();  //生成属性对象
    
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092,slave2:9092"); //配置kafka服务地址
    
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //对象序列化方式
    
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    
            producer = new KafkaProducer(props); //生产者
    
    }

     

     

    生产者发送消息分为两种方式,同步与异步:

    同步消息发送:

    ProducerRecord<String, String> record =new ProducerRecord<>("topic","data");
    
    Producer.send(record).get(); 

    异步消息发送:

    首先增加一个回调函数:

    private class DemoProducerCallback implements Callback{
    
        @Overide
    
        public void onCompletion( RecordMetadata recordMetadata , Exception e) {
    
            if (e != null) {
    
                e.printStackTrace() ; 
    
            }
    
        }
    
    }
    
    ProducerRecord<String, String> record =new ProducerRecord<>("topic","data");
    
    Producer.send(record, new DemoProducerCallback()); 
    

      

    消费者demo

        private static KafkaConsumer consumer  = null;

        static {

            Properties props = new Properties();  //生成属性对象

            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092,slave2:9092"); //配置kafka服务地址

            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //对象序列化方式

            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

            consumer = new KafkaConsumer (props); //生产者

    }

     

    消费者订阅主题:

    consumer.subscribe(“topic”)

    轮询:

    try{
    
        while(true){
    
            ConsumerRecords<String, String> records = consumer.poll(100);
    
            for(ConsumerRecords<String,String> record : records){
    
                int updateCount = 1;
    
                if(custCountryMap.containsValue(records.value())){
    
                    updateCount = custCountryMap.get(record.value()) + 1;
    
                }
    
                custCountryMap.put(record.value(), updateCount);
    
                JSONObject json = new JSONObject(custCountryMap);
    
                System.out.println(json.toString(4));
    
            }
    
        }
    
    }finally{
    
        consumer.close();
    
    }
    

      

    消息轮询是消费者API的核心,通过一个简单的轮询向服务器请求数据,一旦订阅了主题,轮询会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据,开发者只需要用一组简单的API来处理从分区返回的数据。

    Kafka可靠的数据传输

    Kafka的可靠传输主要通过三个部分组成:

     1. Kakfa集群本身的可靠保证机制

    Kafka为了提供的可靠传输,做了以下几方面的保证:

    1)  保证分区内的消息顺序;

    2)  只有当消息被写入分区的所有同步副本时,才被认为是“已提交”的。生产者可以选择消息被完全提交时确认,可以在消息被写入首领副本时确认,或者消息被发送到网络时确认。

    3)  只要有一个副本是活跃的,已经提交的消息就不会丢失。

    4)  消费者只能读取已提交的消息。

    Kafka提供了复制机制和不完全的首领选举两种broker级别的可靠数据保证,以及一种broker和主题级别的可靠性保证最少同步副本。

    不完全的首领选举:是指首领副本和跟随副本之间消息不同步,分为两种情况:

    第一种是跟随副本死亡,首领副本健在并正在接受消息,当返回给生产者已接收后首领死亡,某一个跟随副本起死回生,并成为新的首领副本且是唯一的不同步副本。

    第二种是跟随副本因为某些原因导致同步消息滞后,首领副本是唯一的同步副本,当他出问题时,跟随副本再也不能同步。

    不同步时如果跟随副本不能成为新首领,集群就一直等待原来首领复活。如果跟随副本能成为新首领(不完全选举),就会导致消息丢失。可以通过unclean.leader.election.enable参数决定是否采用不完全选举。

    最少同步副本:将最少同步副本设为2,两个都出问题时,broker停止接受消息,生产者会收到集群的NotEnoughReplicasException异常,消费者继续正常消费数据。当只剩下一个同步副本时,它只支持消费数据,并等待前两个不可用的副本中某一个复活,等待同步消息完成继续可用。

    2. Kakfa消息生产者可靠保证机制

    第一种是跟随副本死亡,首领副本健在并正在接受消息,当返回给生产者已接收后首领死亡,某一个跟随副本起死回生,并成为新的首领副本,消费者继续消费新首领的消息,没有感觉出问题。但对于生产者,少了一条数据。

    第二种是集群中所有broker都死亡,生产者仍然生产数据,kafka返回“首领不可用”响应。生产者如果不处理异常继续生产数据,会造成数据丢失。而且消费者也无法读到生产者认为自己已经生产的数据,造成数据不一致性。

    所以生产者需要根据可靠性需求配置恰当的acks(acks=0,1,all)的值,且在参数配置和代码(重试机制和额外错误处理)里正确处理错误。

    3. Kakfa消息消费者可靠保证机制

    消费者的数据可靠性异常主要表现在一个消费者群组中,某一个消费者消费了消息,提交了offset却没有处理完消息,造成数据丢失。

    可以通过某些参数配置和显示提交偏移量的方式解决问题。

  • 相关阅读:
    验证一下spark Row getAS类型以及控制问题
    Spark异常处理有时间好好拜读一下,spark拍错好文章
    Hive SQL 报错
    Objenesis类库学习一下,没有符合的构造器也可以创建对象
    Scala可变参数方法或者函数传参问题
    大数据相关英文博客,感觉还不错。Mark一下http://dwgeek.com/
    Tomcat 7 'javax.el.ELException' 的解决方式(failed to parse the expression [${xxx}])
    The absolute uri: http://java.sun.com/jsp/jstl/core cannot be resolved in either web.xml or the jar
    在Tomcat中进行数据池连接是所需的包
    EL表达式
  • 原文地址:https://www.cnblogs.com/smzy/p/12898006.html
Copyright © 2011-2022 走看看