zoukankan      html  css  js  c++  java
  • 一些Go操作Kafka的问题

    1)包的选择

    confluent-kafka-go使用了rdkafka的c库,破坏了go代码的收敛,不使用;

    sarama不支持groud id 的功能,写consumer需要自己管理消费的partition,offset;很难用;

    sarama-cluster是对sarama的一层封装,实现了group id 功能

    2)关于offset问题

    sarama-cluster有auto commit的功能,默认是一秒;但最好自己管理,如每100条数据MarkOffset,并CommitOffsets

    3)实现consumer的Priority MQ功能

    如1-5优先级的5个Topic,传入

    map[string]int32 {

      topic1: 1,

      topic2: 2,

      ....

    }

    按Priority生成排序的consumerList,for循环遍历consume,<-consumer.Messages(),select之并设置default分支

    4)producer

    producer使用的AsyncProducer的对象池;测试:本机1K以上message大小,producer池可提升效率,原因是I/O时间长,单一Producer发送效率受限;小message(10byte),单个producer发送效率要高,瓶颈在producer池的频繁Get与Put

    5)网络问题时,consumer会自动重连;

    https://github.com/Shopify/sarama/issues/72

    6)接收producer的Errors() chan一定要用for _, err := range producer.Errors();勿用for{}否则producer意外关闭,这里会死循环;

    for {

      err := <-producer.Errors()  // 错误示例;若producer意外关掉,此外err一直返回nil,跑满CPU

      if err != nil {

        // print log

      }

    }

  • 相关阅读:
    封装ajax---基于axios
    XHR的理解和作用
    params和 query区别
    HTTP请求交互的基本过程
    http3次握手
    ES6----import * as 用法
    微信小程序真机调试:Setting data field "XXX" to undefined is invalid
    webpack详解-----optimization
    node跨域
    shell 的 功能语句--1
  • 原文地址:https://www.cnblogs.com/gm-201705/p/7944362.html
Copyright © 2011-2022 走看看