zoukankan      html  css  js  c++  java
  • (三)Kafka

    课程内容:

    1. 简单的操作一下集群

    2. 简单的介绍几个工具(企业)

    3. Producer的原理(核心,重点)

    4. 简单kafka的代码

    5. 介绍里面的核心参数(重点)

    ========================

    消费者原理

    --replica-factor 2

    --partitions 2我们一般设置分区数,建议是节点的倍数

    =========================================

    Producer的原理

    *******************kafka************************************

    topic:TopicA
    多个分区
    p0:leader parititon hdp1
    p1:leader partition hdp2
    需要把数据发送到leader partition

    生产者,生产数据,需要把数据封装成ProducerRecord
    ①ProducerRecord
    ②序列化
    ③partitioner(获取元数据,找到一个broker就可以
    知道有多少个分区,并清除哪个是leader partition,并把数据发送到哪)

    存入
    缓冲区***
    ④Sender (一个线程)从缓冲区取出消息,封装为一个批次(Batch)
    Batch
    Batch
    Batch




    --------------------------------------------
    zookeeper


    ------------------------------------------------
    broker(去zookeeper注册,选举controller)
    controller(监听zookeeper元数据,动态变化,进行同步)
    (hdp1)
    去zookeeper同步元数据信息,(并分发给其他节点)
    p0



    -------------------------------------------------
    broker(去zookeeper注册,选举controller)
    (hdp2)
    p1

    ---------------------------------------------------


    //创建了一个配置文件的对象
    Properties props = new Properties();
    //这个参数,目的就是为了获取kafka集群的元数据
    //我们写一个主机也行,写多个的话,更安全
    //使用的是主机名,原因是server.properties里面填进去的是主机名,必须配置hosts文件
    props.put("bootstrap.servers","hadoop1:9092,hadoop2:9092,hadoop3:9092");
    //设置序列化器==》kafka的数据是用的网路传输的,所以里面都是二进制的数据
    //我们发送消息的时候,默认的情况下就是发送一个消息就可以了
    //但是你也可以给你的每条消息都指定一个key也是可以的
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
    //调优的参数,后面解释
    // acks
    //-1: 如何判断一条消息发送成功?首先消息要写入leader partition,这些消息还需要被另外的所有的这个分区的副本同步了,才算发送成功
    // 1: 发送的消息写到leader partition 就算写入成功,然后服务器端就返回响应就可以了,默认就是这个参数,有可能会丢数据
    // 0: 消息主要发送出去了,就认为是成功的(允许丢数据,只是处理一些不重要的日志,不需要得到准确的数据)
    // kafka里面的分区是有副本的,比如一个主题TopicA,这个主题有两个parittion,每个partition有三个副本
    // p0: leader parittion ,follower parititon,follower partition
    // p1: leader partition,follower partition,follower partition
    props.put("acks","-1")

    //重试次数,网络抖动(5-10次)
    props.put("retries",3)
    //每隔多久重试一次 2s
    props.put("retry.backoff.ms",2000)
    //提升消息吞入量
    //设置压缩格式,lz4,
    props.put("compression.type","lz4")
    //适当增大缓冲区大小,32M(基本上这个参数不需要设置)
    props.put("buffer.memory",33554432)
    //批次大小,默认16k,这里设置32k;设置这个批次的大小 还跟我们的消息的大小有关
    //假设一条消息1k==》设置100k
    props.put("batch.size",323840)
    //比如我们设置的一个批次的大小是32k,但是size没有满,无论如何到了这个时间都要把消息发送出去了
    //默认是0,100ms
    props.put("linger.ms",100)

    //这个值,默认是1M,代表的是,生产者发送消息的时候,最大的一条消息(注意说的不是批次)
    // byte,如果消息超过1M,程序会报错,可以设置为10M
    props.put("max.request.size",1024*1024*10)

    // 消息发送出去后,多久没有响应,默认为超时
    // 如果网络不稳定,可以适当增大
    props.put("request.timeout.ms",3000)

    //创建生产者
    KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props);
    //创建消息
    ProducerRecord<String,String> record = new ProducerRecord<String,String>(
    "xoxo","this_is_key","this_is_value")

    //发送消息,
    //异步发送:性能比较好,也是我们生产里面使用的方式
    //同步发送:等到返回响应,发送下一条,性能不好,我们生产里面一般不用

    producer.send(record,new Callback(){
      public void onCompletion(RecordMetadata metadata,Exception exception){
        if(exception == null){
          sout("success")
        } else {
          sout("error")
        }
      }
    })

     小案例

    创建消息在kafka里面,我们发送消息的时候,可以给消息指定key,也可以不指定key跟我们要把找个消息发送到这个主题的哪个分区欧关系比如:
    topicA:
      
      p0:leader partition ,follower partition


      p1:leader partition ,follower partition

    1)不指定key
        发送的一条消息,会以轮询的方式,发送到分区里面
    message1 p0
         message2 p1
    message3 p0
    message4 p1
    2)如果指定key
      test_key 取这个key的hash值 数字 3
      数字/分区数 取模 3/2
      比如分区数2,结果要么是0,要么是1
    test_key===> p1
      这样子的话,我们可以保证这样的一个事,key相同的消息一定会被发送到同一个分区


    class OrderProducer{}

    // 创建生产者
    public static KafkaProducer<String,String> getProducer(){
      Properties props = new Properties();
      props.put();
      props.put();
      props.put();
      props.put();
      KafkaProducer<String,String> producer = nre KafkaProducer<String,String>(props);
    return producer;
    }

    public static void createRecord(){
      JSONObject order = new JSONObject();
      order.put("userId","123");
      order.put("orderId","123a");
      order.put("amount",1000.0);
    order.put("operator","pay");
      return order;
    }

    ProducerRecord<String,String> record = new ProducerRecord<>(
      "topicA","test_key",order.toString()
    );
    用哪个字段作为key
    orderId/userId作为key

    producer.send(record,new Callback(){
      public void onCompletion(RecordMetadata metadata,Exception exception){

        if(exception == null ){
          sout("succ")
        } else {
          mysql,redis备用别的链路
        }
      }

    });


    场景:

    消费数据,消费了一段时间以后,我程序停了,

    下一次启动的时候,我的程序从哪个地方开始消费?

    我上一次消费到哪了?

    offset

    consumer原理:

    在kafka里面,kafka是不帮我们维护这个offset的,这个偏移量需要consumer

    自己去维护

    consumer这,kafka提供了两个参数

      props.put("enable.auto.commit","true")是否开启自动提交偏移量 

      props.put("auto.commit.ineterval.ms","1000")每次自动提交offset的一个时间间隔

    比如我们消费者消费的topicA(p0,p1):

      consumerA:

        topicAp0:10001(offset)

        topicAp1:10008(offset)

    下一次启动的时候,直接从offset出开始消费

    偏移量数据存到哪?

    kafka0.8及之前,消费者偏移量的数据是存储到zookeeper里面

    问题:n多个topic,n多个partition,造成zookeeper请求过多

    kafka0.8以后,offset存到了kafka内部的一个主题里面,__consumer_offset

    这个kafka内部的主题,默认是50个分区

     

    kafka 3个节点,
    consumer group ,2个消费者


    提交偏移量,类似生产者
    指定key:groupid+topic+分区编号
    消费同一个分区的数据的偏移量会存储到同一个offset partition里面

    用了__consumer_offset这种方式保存offset,把压力分散到各个broker上面

    -------------------------------------
    consumer group:group id
    consumer leader
    consumer
    consumer

    1 两个consumer 去消费3个分区
    2 三个consumer 去消费个分区

    当两个consumer时,去消费3个分区,会进行负载均衡
    负载均衡
    coordinator(协调)
    消费者组从kafka集群选一台作为coordinator服务器
    根据group id号,计算出来一个hash值,数值
    然后用这个数值对 __consumer_offset分区数取模(默认50),比如:2
    拿到2以后,就去集群,看 partition为2的这个分区的leader partition在哪一台服务器上面
    那么哪一台服务器就是coordinator服务器

    找到coordinator之后
    1 所有consumer group下的consumer都去 coordinator这台服务器注册
    2 这台coordinator服务器会从这个消费组里面选一个leader consumer(谁先注册谁就是)
      同时这个coordinator服务器也会把你要消费topic的消息发送给leader consumer
    3 leader consumer 制定分区的消费方案,发送sync group 请求,把分区消费方案发送给coordinator服务器
    4 coordinator 下发分区的消费方案,给各个consumer
    5 若一个consumer挂掉,重新执行上面步骤


    负载均衡策略
    1 range策略
    0-3分区 consumer1
    4-7分区 consumer2
    8-12分区 consuemr3
    2 round-robin策略(轮询,)
    0,3,6分区 consumer1
    1,4,7分区 consumer2
    2,5,8分区 consumer3
    上面两个方案有个问题:
    假设consumer1挂了:p0-3分区 分配给consumer2和3
    原本在consumer2上的p3 被分配到consumer3上了

    3 sticky策略
    最新的一个sticky策略,就是说尽可能保证rebalance的时候,让原本属于这个consumer
    的分区还是属于他们
    然后再把多余的分区再均匀分配过去,这样尽可能维持原来的分区分配策略



    rebalance分代机制




    ***********kafka**********

    --------------------------
    broker(coordinator
    topicA p0 leader




    --------------------------
    broker
    topicA p1 leader



    --------------------------
    broker
    topicA p2 leader



    ==================================================================================

    consumer核心参数
    1 consumer心跳时间,1000,不要设置得太久,不然coordinator服务器不太容易发现你的消费者宕机了
    heartbeat.interval.ms 1000
    2 session.timeout.ms 1000*10 多久没发送心跳认为超时
    3 max.poll.interval.ms 30*1000 每隔多久拉取一次数据
    4 max.poll.records 500 每次拉取500条数据,可增大至500*2
    5 connection.max.idle.ms -1 当连接空闲的时候,是否回收。-1 不回收
    6 enable.auto.commit true
    7 auto.commit.interval.ms 1000
    8 auto.offset.reset earliest 下一次从哪开始消费
    earliest
      当各分区已提交offset时,从提交的offset开始消费,无提交offset时,从头开始消费
    latest(用的多)
      当各个分区下有已提交的offset时,从提交的offset开始消费,无提交offset时,从当前最新的数据开始消费
    none
      topic各分区都存在已提交的offset时,从offset后开始消费,只要有一个分区不存在已提交的offset,则抛出异常

    二分查找

    000001111.log 数据文件

    message1 offset:1111  position:234

    message2 offset:1112  position:1092

    message3 offset:1113  position:1872

    message4 offset:1114  position:2098

    message5 offset:1115  position:2395

    message6 offset:1116  position:3011

                 物理位置

    数据中不存这个position,是指磁盘上面的物理位置

    稀松索引

    000001111.index索引文件(不会每条都记录,每隔一段时间记录一次)很小几k

    1111  234

    1113  1872

    1116  3011

    比如:消费1115的偏移量

    从index找比1115大和小的文件,再去log文件定位位置

    ISR机制

    in-sync-replicas

    p0:

      p0-leader,<=  p0-follower1,p0-follower2

      ISR:副本跟leader分区保持同步的副本加入ISR

      ISR:p0-leader,p0-follower1,p0-follower2

    假设:p0-leader挂了,一区两个分区之一成为leader

    假设:副本什么情况下会被踢出ISR列表

    如果一个follower超过10s没有去跟leader partition去同步数据,那么这个follower就会被

    踢出ISR列表

    什么样的情况下又可以重新加入?

    看数据差异大不大,下节讲

    如果同步了,加进去

  • 相关阅读:
    记一次GreenPlum性能调优
    PostgreSQL时间格式及相关函数实践
    OGG到OGGAdapter配置详情-从Oracle直接抽取成csv文件
    使yum保留下载的rpm包
    源码编译tmux
    抠图
    ps磨皮的方法
    谷歌学术网站镜像
    element菜单默认展开和选中
    git仓库如果是私密的,每台电脑上导下来都需要进行ssh授权,所以一个项目不知一个ssh权限
  • 原文地址:https://www.cnblogs.com/hanchaoyue/p/13302910.html
Copyright © 2011-2022 走看看