zoukankan      html  css  js  c++  java
  • kafka 高可靠

    1.集群高可靠

    ①搭建kafka集群(略)

    ②重点配置项(每个broker配置相同,只有broker.id不一样)

    broker.id=1     当前机器在集群中的唯一标识,和zookeeper的myid性质一样

    listeners=PLAINTEXT://10.22.0.13:9092    最好用真实的IP

    advertised.listeners=PLAINTEXT://10.22.0.13:9092      最好用真实的IP hostname,port配置过时

    num.partitions=3    新建topic 默认分区数

    default.replication.factor=3  新建topic 默认副本集数

    offsets.topic.replication.factor=3  副本集因子  (必须配置为大于1,小于或者等于broker数,不然当消费者的协同节点broker宕机了,不会重新选举,导致消费者dead,达不到集群高可靠目的)

    zookeeper.connect=10.22.0.13:2182,10.22.0.14:2182,10.22.0.15:2182   zookeeper地址

    log.dirs=/home/txc/kafka1/kafkalogs  kafka数据日志保存路径

    2.消息至少消费一次

    消费者默认情况下,enable.auto.commit=true 消费者的offset消费者的offset将在后台周期性的提交,当消息处理失败时,偏移量offset已经提交了,导致消息丢失

    要保证消费至少消费一次,首先enable.auto.commit=false,然后每次消息处理成功后,手动提交偏移量offset, consumer.commitAsync();

    3.自定义分区(尽可能让数据在分区中均匀分布)

    Kafka中,topic是逻辑上的概念,而partition是物理上的概念。不用担心,这些对用户来说是透明的。生产者(producer)只关心自己将消息发布到哪个topic,而消费者(consumer)只关心自己订阅了哪个topic上的消息,至少topic上的消息分布在哪些partition节点上,它本身并不关心。

    如果没有分区的概念,那么topic的消息集合将集中于某一台服务器上,单节点的存储性能马上将成为瓶颈,当访问该topic存取数据时,吞吐也将成为瓶颈。 
    介于此,kafka的设计方案是,生产者在生产数据的时候,可以为每条消息人为的指定key,这样消息被发送到broker时,会根据分区规则,选择消息将被存储到哪一个分区中。
    如果分区规则设置合理,那么所有的消息将会被均匀/线性的分布到不同的分区中,这样就实现了负载均衡和水平扩展。另外,在消费者端,同一个消费组可以多线程并发的从多个分区中 同时消费数据。
    上述分区规则,实际上是实现了kafka.producer.Partitioner接口的一个类,这个实现类可以根据自己的业务规则进行自定义制定,如根据hash算法指定分区的分布规则。 如以下这个类,我们先获取
    key的hashcode值,再跟分区数量(配置文件中为numPartitions)做模运算,结果值作为分区存储位置,这样可以实现数据均匀线性的分布。

    ①自定义TxcPartitioner类

    public class TxcPartitioner implements Partitioner{
    
        @Override
        public void configure(Map<String, ?> arg0) {
            
        }
    
        @Override
        public void close() {
            
        }
    
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int size = partitions.size();
        //如果消息的 key 为 null,默认分配到指定分区
            if(keyBytes == null) {
                return 0;
            }
         //如果 key 不为 null,并且使用了默认的分区器,kafka 会使用自己的 hash 算法对 key 取 hash 值,
           //使用 hash 值与 partition 数量取模,从而确定发送到哪个分区。
           //注意:此时 key 相同的消息会发送到相同的分区(只要 partition 的数量不变化)
    
        return Utils.toPositive(Utils.murmur2(keyBytes)) % size; 
    }

    ②发送消息的方法如下

    public void send(String topic,String key,RequestMessage message){  
            try {  
                if(kafkaProducer != null) {
                    ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,key,JSONObject.toJSONString(message));  
                    Future<RecordMetadata> future = kafkaProducer.send(record);
                    RecordMetadata metadata = future.get();
                    if(metadata != null) {
                        sysLog.debug("【Kafka message send success,topic = {}, partition is {} 】 " , metadata.topic(),metadata.partition());
                    }else {
                        sysLog.error("【Kafka message send fail 】");
                        throw new KafkaSendException("Kafka message send fail");
                    }
                }else {
                    sysLog.error("【TxcProducer is not init】");
                    throw new KafkaInitException("TxcProducer is not init");
                }
               
            }catch(Exception e){ 
                sysLog.error("【Kafka message send fail , exception = {}】 ",ExceptionUtil.collectExceptionStackMsg(e));
                throw new KafkaSendException("Kafka message send fail");  
            }  
        }  

    ③生产者配置中添加配置

    //设置自定义分区
    properties.put(TxcParameType.partitioner_class.getName(), TxcPartitioner.class.getName());

    注意:之所以需要自定义分区,是因为同一个分区的消息可以保证严格的顺序性,通过自定义分区设置的key值(比如交易流水号)可以让同一笔交易的消息严格按照顺序发送接收

    4.消息到达可靠

    保证消息到达可靠,生产者的配置项acks=all;

    生产者需要leader确认请求完成之前接收的应答数。此配置控制了发送消息的耐用性,支持以下配置:

    acks=0 如果设置为0,那么生产者将不等待任何消息确认。消息将立刻天际到socket缓冲区并考虑发送。在这种情况下不能保障消息被服务器接收到。并且重试机制不会生效(因为客户端不知道故障了没有)。每个消息返回的offset始终设置为-1。

    acks=1,这意味着leader写入消息到本地日志就立即响应,而不等待所有follower应答。在这种情况下,如果响应消息之后但follower还未复制之前leader立即故障,那么消息将会丢失。

    acks=all 这意味着leader将等待所有副本同步后应答消息。此配置保障消息不会丢失。这是最强壮的可用性保障。等价于acks=-1。

  • 相关阅读:
    私有属性的另类访问方式
    获取类所有属性和查看帮助文档
    类的私有属性及私方法(请注意属性的传值方式)
    类的私有属性及私方法
    类的定义
    怎么区分类变量和实例变量?
    面向对象编程案例04--访问控制
    面向对象编程案例03---继承之高级部分
    python 面向对象编程案例01
    静态方法
  • 原文地址:https://www.cnblogs.com/cowboys/p/9259372.html
Copyright © 2011-2022 走看看