zoukankan      html  css  js  c++  java
  • kafka consumer重复消费问题

    在做分布式编译的时候,每一个worker都有一个consumer,适用的kafka+zookeep的配置都是默认的配置,在消息比较少的情况下,每一个consumer都能均匀得到互不相同的消息,但是当消息比较多的时候,这个时候consumer就有重复消费的情况。

    如何排查这种问题呢?

    最开始以为是系统资源的瓶颈,编译worker和kafka cluster都在一个集群上,导致消息同步不及时,所以就另外找了几个机器,kafka和编译worker分开。但是还是会遇到上述问题。

    然后就想集群环境一致性最重要的一个条件是时间,然后将所有的机器都设置了统一的时间服务器,结果还是不行。

    最后查看自己写的代码,将缓存队列开大一些,竟然没有重复了。

    心想可能是自己使用的方式有点问题:下面是分析的过程

     1: 分析重复的情况是什么?是由于不同hosts上重复的消费,还是相同hosts上的重复消费?

      经过对比测试,是属于相同hosts上的重复消费,间隔大概为20个message

      2: kafka的消息队列模型是什么?

    • 一个topic: tizen-unified
    • 三个partition:0,1,2
    • 一个producer, 三个consumer。
    • 每个consumer 端设置一个缓存队列,当队列满的时候,consumer线程进入阻塞的状态,这个时候编译进程从队列中取package进行编译,直到队列为空为止。

    如下是consumer端的代码:

    #!/usr/bin/env python
    # vim: ai ts=4 sts=4 et sw=4
    """gbs-worker - distributed system worker module."""
    import os
    import re
    import Queue
    import time
    import threading
    import multiprocessing
    from multiprocessing import Process
    from kafka import KafkaConsumer
    from kafka import KafkaProducer
    
    class Producer(Process):
            def __init__(self):
                    super(Producer, self).__init__()
                    self.messageHandler = KafkaConsumer('tizen-unified',group_id='tizen-worker',bootstrap_servers='109.123.100.144:9092')
            def run(self):
                    print "start produce, os pid: %d" % (os.getpid())
                    for message in self.messageHandler: # which will resume if there is a message from kafka
                            while(WorkerPool.taskQueue.full() is True):
                                    time.sleep(5)
                            print "put package %s into queue" % message.value
                            WorkerPool.taskQueue.put(message.value)
    
    class GbsBuild(object):
            errorRule = re.compile(r"some packages failed to be built")
            def __init__(self,packageName, id):
                    super(GbsBuild, self).__init__()
                    self.sourcePath = "/home/scm/renjg/workspace"
                    self.logPath = "/home/scm/renjg/logs"
                    self.gbsPath = "/usr/bin/gbs"
                    self.gbsconf = self.sourcePath+"/.gbs.conf"
                    self.gbsRoot = "/home/scm/GBS-ROOT"
                    self.packageName = packageName
                    self.threadId = id
            def build(self):
                    os.system("cd %s" % (self.sourcePath))
                    os.system("mkdir -p %s" % self.logPath)
                    result = os.popen("gbs -c %s build -A armv7l -B %s-%d --binary-list %s --clean >%s/%s.log 2>&1" % (self.gbsconf,self.gbsRoot, self.threadId,  self.packageName, self.logPath, self.packageName)).read()
                    if GbsBuild.errorRule.findall(result):
                            print "%s build error 
     log file is: %s/%s.log" % (self.packageName, self.logPath, self.packageName)
                            return "Fail"
                    os.system("scp -r %s renjg@109.123.123.6:/home/renjg/GBS-ROOT/local/repos/unified_standard/" % (self.gbsRoot+"-"+str(self.threadId)+"/local/repos/unified_standard/armv7l/"))
                    print "%s package in process %d ,build done, copy done" % (self.packageName, self.threadId)
                    return "Success"
    
    class Consumer(Process):
            def __init__(self, threadId, partition = 0):
                    super(Consumer,self).__init__()
                    self.partition = partition
                    self.threadId = threadId
                    self.messageHandler = KafkaProducer(bootstrap_servers="109.123.100.144:9092")
            def run(self):
                    print "start consume thread %d , os pid: %d" % (self.threadId, os.getpid())
                    while True:
                            while WorkerPool.taskQueue.empty() is True:
                                    time.sleep(1)
                            packageName = WorkerPool.taskQueue.get()
                            print "thread %d start %s package " % (self.threadId, packageName)
                            gbsbuild = GbsBuild(packageName,self.threadId)
                            print "thread %d building %s package" % (self.threadId, packageName)
                     
                            if gbsbuild.build() == "Success":
                            #if True:
                                    result = self.messageHandler.send("tizen-unified-status", value = "succeed", key = packageName, partition=0)
                                    if(result.get(60)):
                                            print "send success"
                                    else:
                                            print "send fail"
                            else:
                                    result = self.messageHandler.send("tizen-unified-status", value = "failed", key = packageName, partition=0)
                                    if(result.get(60)):
                                            print "send success"
                                    else:
                                            print "send fail"
    
    class WorkerPool(object):
            capcaticy = 4
            curThreadNum = 0
            taskQueue = multiprocessing.Queue(capcaticy*100) #如果taskQueue很小的话,那么就会出现producer重复的获取消息的情况。
            def __init__(self):
                    self.producer = Producer()
                    self.consumers = [Consumer(i) for i in xrange(WorkerPool.capcaticy)]
            def start(self):
                    print "start Worker pool"
                    self.producer.start()
                    for i in range(0, WorkerPool.capcaticy):
                            self.consumers[i].start()
                    self.producer.join()
                    for i in range(0, WorkerPool.capcaticy):
                            self.consumers[i].join()
    wp = WorkerPool()
    wp.start()
    print "Done"
    

    当前的改进思路是:扩大WorkerPool.taskQueue的容量,当一次poll就获取足够多的message,然后consumer慢慢处理。

    还有一种改进思路是:去掉producer,直接在每一个consumer中间创建一个connection, 把互斥的任务交给kafka去做。待测试。

  • 相关阅读:
    Eclipse使用xdoclet1.2.3 生成hibernate配置文件和映射文件
    Eclipse安装SVN插件
    SourceTree安装和使用
    myeclipse通过数据表生成jpa或hibernate实体
    Delphi 快速读取TXT 指定行的数据
    delphi中如何将一整个文件读入内存
    Delphi TextFile读取文本文件
    Delphi读取和写入utf-8编码格式的文件
    Delphi 判断特定字符是为单字节还是双字节
    delphi按字节长度分割字符串函数(转)
  • 原文地址:https://www.cnblogs.com/Spider-spiders/p/10244924.html
Copyright © 2011-2022 走看看